Move common storage, plot and statistic code to cephlib
diff --git a/wally/ceph.py b/wally/ceph.py
index 3527db3..89324c5 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -144,7 +144,7 @@
return path
-class FillCephInfoStage(Stage):
+class CollectCephInfoStage(Stage):
config_block = 'ceph'
priority = StepOrder.UPDATE_NODES_INFO
diff --git a/wally/common_types.py b/wally/common_types.py
index 470a325..4aedfaa 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,47 +1,12 @@
-import abc
-from typing import Any, Union, List, Dict, NamedTuple
+from typing import Any, Dict, NamedTuple
+
+from cephlib.storage import IStorable
IP = str
IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
-class IStorable(metaclass=abc.ABCMeta):
- """Interface for type, which can be stored"""
-
- @abc.abstractmethod
- def raw(self) -> Dict[str, Any]:
- pass
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- pass
-
-
-Basic = Union[int, str, bytes, bool, None]
-StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
-
-
-class Storable(IStorable):
- """Default implementation"""
-
- __ignore_fields__ = [] # type: List[str]
-
- def raw(self) -> Dict[str, Any]:
- return {name: val
- for name, val in self.__dict__.items()
- if not name.startswith("_") and name not in self.__ignore_fields__}
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- obj = cls.__new__(cls)
- if cls.__ignore_fields__:
- data = data.copy()
- data.update(dict.fromkeys(cls.__ignore_fields__))
- obj.__dict__.update(data)
- return obj
-
-
class ConnCreds(IStorable):
def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
key_file: str = None, key: bytes = None) -> None:
diff --git a/wally/config.py b/wally/config.py
index 95ae511..2ec7ba5 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,6 +1,6 @@
from typing import Any, Dict, Optional, Set
-from .common_types import IStorable
+from cephlib.storage import IStorable
ConfigBlock = Dict[str, Any]
diff --git a/wally/console_report.py b/wally/console_report.py
index ac54965..1528089 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -4,13 +4,13 @@
import numpy
from cephlib.common import float2str
+from cephlib import texttable
+from cephlib.statistic import calc_norm_stat_props, calc_histo_stat_props
-from . import texttable
-from .hlstorage import ResultStorage
+from .result_classes import IResultStorage
from .stage import Stage, StepOrder
from .test_run_class import TestRun
from .suits.io.fio import FioTest
-from .statistic import calc_norm_stat_props, calc_histo_stat_props
from .suits.io.fio_hist import get_lat_vals
from .data_selectors import get_aggregated
@@ -23,22 +23,23 @@
priority = StepOrder.REPORT
def run(self, ctx: TestRun) -> None:
- rstorage = ResultStorage(ctx.storage)
- for suite in rstorage.iter_suite(FioTest.name):
+ for suite in ctx.rstorage.iter_suite(FioTest.name):
table = texttable.Texttable(max_width=200)
- tbl = rstorage.get_txt_report(suite)
+ tbl = ctx.rstorage.get_txt_report(suite)
if tbl is None:
table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
- for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
- bw_ts = get_aggregated(rstorage, suite, job, metric='bw')
+ for job in sorted(ctx.rstorage.iter_job(suite), key=lambda job: job.params):
+ bw_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='bw',
+ trange=job.reliable_info_range_s)
props = calc_norm_stat_props(bw_ts)
avg_iops = props.average // job.params.params['bsize']
iops_dev = props.deviation // job.params.params['bsize']
- lat_ts = get_aggregated(rstorage, suite, job, metric='lat')
+ lat_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='lat',
+ trange=job.reliable_info_range_s)
bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000 # convert us to ms
lat_props = calc_histo_stat_props(lat_ts, bins_edges)
table.add_row([job.params.summary,
@@ -48,5 +49,5 @@
float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
tbl = table.draw()
- rstorage.put_txt_report(suite, tbl)
+ ctx.rstorage.put_txt_report(suite, tbl)
print(tbl)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index a5ac400..3e6bc3e 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,20 +1,13 @@
-import ctypes
import logging
-import os.path
-from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
-from fractions import Fraction
+from typing import Tuple, Iterator
import numpy
-from cephlib.numeric import auto_edges2
+from cephlib.numeric_types import DataSource, TimeSeries
+from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
-import wally
-from .hlstorage import ResultStorage
-from .node_interfaces import NodeInfo
-from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
-from .suits.io.fio import FioJobConfig
+from .result_classes import IResultStorage
from .suits.io.fio_hist import expected_lat_bins
-from .utils import unit_conversion_coef
logger = logging.getLogger("wally")
@@ -40,52 +33,25 @@
AGG_TAG = 'ALL'
-def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
- nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
- node_roles_s = set(node_roles)
- return [node for node in nodes if node.roles.intersection(node_roles_s)]
-
-
-def find_all_sensors(rstorage: ResultStorage,
- node_roles: Iterable[str],
- sensor: str,
- metric: str) -> Iterator[TimeSeries]:
- all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
- all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
-
- for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
- ts = rstorage.load_sensor(ds)
-
- # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
- # to make this array consistent with times in load data second item if each pair is dropped
- ts.times = ts.times[::2]
- yield ts
-
-
-def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
+def find_all_series(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
"Iterated over selected metric for all nodes for given Suite/job"
- return rstorage.iter_ts(suite, job, metric=metric)
+ return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
-def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
+def get_aggregated(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str,
+ trange: Tuple[int, int]) -> TimeSeries:
"Sum selected metric for all nodes for given Suite/job"
- tss = list(find_all_series(rstorage, suite, job, metric))
+ tss = list(find_all_series(rstorage, suite_id, job_id, metric))
if len(tss) == 0:
- raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
+ raise NameError("Can't found any TS for {},{},{}".format(suite_id, job_id, metric))
- ds = DataSource(suite_id=suite.storage_id,
- job_id=job.storage_id,
- node_id=AGG_TAG,
- sensor='fio',
- dev=AGG_TAG,
- metric=metric,
- tag='csv')
+ ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
+ dev=AGG_TAG, metric=metric, tag='csv')
tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
res = None
- trange = job.reliable_info_range_s
for ts in tss_inp:
if ts.time_units != 's':
@@ -121,10 +87,7 @@
assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
res += dt
- agg_ts = TimeSeries(metric,
- raw=None,
- source=ds,
- data=res,
+ agg_ts = TimeSeries(res, source=ds,
times=tss_inp[0].times.copy(),
units=tss_inp[0].units,
histo_bins=tss_inp[0].histo_bins,
@@ -132,299 +95,3 @@
return agg_ts
-
-interpolated_cache = {}
-
-
-c_interp_func_agg = None
-c_interp_func_qd = None
-c_interp_func_fio = None
-
-
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg',
- allow_broken_step: bool = False) -> TimeSeries:
- "Interpolate time series to values on seconds borders"
- key = (ts.source.tpl, tp)
- if not nc and key in interpolated_cache:
- return interpolated_cache[key].copy()
-
- if tp in ('qd', 'agg'):
- # both data and times must be 1d compact arrays
- assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
- assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
-
- assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
- assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
-
- assert len(ts.times) == len(ts.data), "len(times)={} != len(data)={} for {!s}"\
- .format(len(ts.times), len(ts.data), ts.source)
-
- rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
-
- if isinstance(rcoef, Fraction):
- assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
- rcoef = rcoef.numerator
-
- assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
- coef = int(rcoef) # make typechecker happy
-
- global c_interp_func_agg
- global c_interp_func_qd
- global c_interp_func_fio
-
- uint64_p = ctypes.POINTER(ctypes.c_uint64)
-
- if c_interp_func_agg is None:
- dirname = os.path.dirname(os.path.dirname(wally.__file__))
- path = os.path.join(dirname, 'clib', 'libwally.so')
- cdll = ctypes.CDLL(path)
-
- c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
- c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
-
- for func in (c_interp_func_agg, c_interp_func_qd):
- func.argtypes = [
- ctypes.c_uint, # input_size
- ctypes.c_uint, # output_size
- uint64_p, # times
- uint64_p, # values
- ctypes.c_uint, # time_scale_coef
- uint64_p, # output
- ]
- func.restype = ctypes.c_uint # output array used size
-
- c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
- c_interp_func_fio.restype = ctypes.c_int
- c_interp_func_fio.argtypes = [
- ctypes.c_uint, # input_size
- ctypes.c_uint, # output_size
- uint64_p, # times
- ctypes.c_uint, # time_scale_coef
- uint64_p, # output indexes
- ctypes.c_uint64, # empty placeholder
- ctypes.c_bool # allow broken steps
- ]
-
- assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
- assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
-
- output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
- result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
-
- if tp in ('qd', 'agg'):
- assert not allow_broken_step, "Broken steps aren't supported for non-fio arrays"
- func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
- sz = func(ts.data.size,
- output_sz,
- ts.times.ctypes.data_as(uint64_p),
- ts.data.ctypes.data_as(uint64_p),
- coef,
- result.ctypes.data_as(uint64_p))
-
- result = result[:sz]
- output_sz = sz
-
- rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
- else:
- assert tp == 'fio'
- ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
- no_data = (output_sz + 1)
- sz_or_err = c_interp_func_fio(ts.times.size,
- output_sz,
- ts.times.ctypes.data_as(uint64_p),
- coef,
- ridx.ctypes.data_as(uint64_p),
- no_data,
- allow_broken_step)
- if sz_or_err <= 0:
- raise ValueError("Error in input array at index {}. {}".format(-sz_or_err, ts.source))
-
- rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
-
- empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
- res = []
- for idx in ridx[:sz_or_err]:
- if idx == no_data:
- res.append(empty)
- else:
- res.append(ts.data[idx])
- result = numpy.array(res, dtype=ts.data.dtype)
-
- res_ts = TimeSeries(ts.name, None, result,
- times=rtimes,
- units=ts.units,
- time_units='s',
- source=ts.source(),
- histo_bins=ts.histo_bins)
-
- if not nc:
- interpolated_cache[ts.source.tpl] = res_ts.copy()
-
- return res_ts
-
-
-def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
- """Return sensor values for given node for given period. Return per second estimated values array
- Raise an error if required range is not full covered by data in storage"""
-
- assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
- assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
- .format(len(ts.times), len(ts.data), ts.source)
-
- if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
- raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
- "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
- ts.source.node_id, ts.source.sensor, ts.source.dev,
- ts.source.metric))
- idx1, idx2 = numpy.searchsorted(ts.times, time_range)
- return TimeSeries(ts.name, None,
- ts.data[idx1:idx2],
- times=ts.times[idx1:idx2],
- units=ts.units,
- time_units=ts.time_units,
- source=ts.source,
- histo_bins=ts.histo_bins)
-
-
-def make_2d_histo(tss: List[TimeSeries],
- outliers_range: Tuple[float, float] = (0.02, 0.98),
- bins_count: int = 20,
- log_bins: bool = False) -> TimeSeries:
-
- # validate input data
- for ts in tss:
- assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
- .format(len(ts.times), len(ts.data), ts.source)
- assert ts.time_units == 's', "All arrays should have the same data units"
- assert ts.units == tss[0].units, "All arrays should have the same data units"
- assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
- assert len(ts.data.shape) == 1, "All arrays should be 1d"
-
- whole_arr = numpy.concatenate([ts.data for ts in tss])
- whole_arr.shape = [len(tss), -1]
-
- if outliers_range is not None:
- max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
- [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
- bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
- fixed_bins_edges = bins_edges.copy()
- fixed_bins_edges[0] = begin
- fixed_bins_edges[-1] = end
- else:
- begin, end = numpy.percentile(whole_arr, [0, 100])
- bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
- fixed_bins_edges = bins_edges
-
- res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
- res_data.shape = (len(tss), -1)
- res = TimeSeries(name=tss[0].name,
- raw=None,
- data=res_data,
- times=tss[0].times,
- units=tss[0].units,
- source=tss[0].source,
- time_units=tss[0].time_units,
- histo_bins=bins_edges)
- return res
-
-
-def aggregate_histograms(tss: List[TimeSeries],
- outliers_range: Tuple[float, float] = (0.02, 0.98),
- bins_count: int = 20,
- log_bins: bool = False) -> TimeSeries:
-
- # validate input data
- for ts in tss:
- assert len(ts.times) == len(ts.data), "Need to use stripped time"
- assert ts.time_units == 's', "All arrays should have the same data units"
- assert ts.units == tss[0].units, "All arrays should have the same data units"
- assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
- assert len(ts.data.shape) == 2, "All arrays should be 2d"
- assert ts.histo_bins is not None, "All arrays should be 2d"
-
- whole_arr = numpy.concatenate([ts.data for ts in tss])
- whole_arr.shape = [len(tss), -1]
-
- max_val = whole_arr.min()
- min_val = whole_arr.max()
-
- if outliers_range is not None:
- begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
- else:
- begin = min_val
- end = max_val
-
- bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
-
- if outliers_range is not None:
- fixed_bins_edges = bins_edges.copy()
- fixed_bins_edges[0] = begin
- fixed_bins_edges[-1] = end
- else:
- fixed_bins_edges = bins_edges
-
- res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
- res_data.shape = (len(tss), -1)
- return TimeSeries(name=tss[0].name,
- raw=None,
- data=res_data,
- times=tss[0].times,
- units=tss[0].units,
- source=tss[0].source,
- time_units=tss[0].time_units,
- histo_bins=fixed_bins_edges)
-
-
-qd_metrics = {'io_queue'}
-summ_sensors_cache = {} # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
-
-
-def summ_sensors(rstorage: ResultStorage,
- roles: List[str],
- sensor: str,
- metric: str,
- time_range: Tuple[int, int],
- nc: bool = False) -> Optional[TimeSeries]:
-
- key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
- if not nc and key in summ_sensors_cache:
- return summ_sensors_cache[key].copy()
-
- res = None # type: Optional[TimeSeries]
- for node in find_nodes_by_roles(rstorage, roles):
- for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
- data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
- data = get_ts_for_time_range(data, time_range)
- if res is None:
- res = data
- res.data = res.data.copy()
- else:
- res.data += data.data
-
- if not nc:
- summ_sensors_cache[key] = res
- if len(summ_sensors_cache) > 1024:
- logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
-
- return res if res is None else res.copy()
-
-
-def find_sensors_to_2d(rstorage: ResultStorage,
- roles: List[str],
- sensor: str,
- devs: List[str],
- metric: str,
- time_range: Tuple[int, int]) -> numpy.ndarray:
-
- res = [] # type: List[TimeSeries]
- for node in find_nodes_by_roles(rstorage, roles):
- for dev in devs:
- for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
- data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
- data = get_ts_for_time_range(data, time_range)
- res.append(data.data)
- res2d = numpy.concatenate(res)
- res2d.shape = ((len(res), -1))
- return res2d
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
deleted file mode 100644
index 9f66030..0000000
--- a/wally/hlstorage.py
+++ /dev/null
@@ -1,387 +0,0 @@
-import os
-import pprint
-import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional, List, Any
-
-import numpy
-
-from .suits.job import JobConfig
-from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage, ArrayData
-from .storage import Storage
-from .utils import StopTestError
-from .suits.all_suits import all_suits
-
-
-logger = logging.getLogger('wally')
-
-
-class DB_re:
- node_id = r'\d+.\d+.\d+.\d+:\d+'
- job_id = r'[-a-zA-Z0-9_]+_\d+'
- suite_id = r'[a-z_]+_\d+'
- sensor = r'[-a-z_]+'
- dev = r'[-a-zA-Z0-9_]+'
- tag = r'[a-z_.]+'
- metric = r'[a-z_.]+'
-
-
-class DB_paths:
- suite_cfg_r = r'results/{suite_id}\.info\.yml'
-
- job_root = r'results/{suite_id}\.{job_id}/'
- job_cfg_r = job_root + r'info\.yml'
-
- # time series, data from load tool, sensor is a tool name
- ts_r = job_root + r'{node_id}\.{sensor}\.{metric}\.{tag}'
-
- # statistica data for ts
- stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat\.yaml'
-
- # sensor data
- sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.{tag}'
- sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
-
- report_root = 'report/'
- plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
- txt_report = report_root + '{suite_id}_report.txt'
-
- job_extra = 'meta/{suite_id}.{job_id}/{tag}'
-
- job_cfg = job_cfg_r.replace("\\.", '.')
- suite_cfg = suite_cfg_r.replace("\\.", '.')
- ts = ts_r.replace("\\.", '.')
- stat = stat_r.replace("\\.", '.')
- sensor_data = sensor_data_r.replace("\\.", '.')
- sensor_time = sensor_time_r.replace("\\.", '.')
- plot = plot_r.replace("\\.", '.')
-
-
-DB_rr = {name: r"(?P<{}>{})".format(name, rr)
- for name, rr in DB_re.__dict__.items()
- if not name.startswith("__")}
-
-
-def fill_path(path: str, **params) -> str:
- for name, val in params.items():
- if val is not None:
- path = path.replace("{" + name + "}", val)
- return path
-
-
-class ResultStorage(IResultStorage):
- # TODO: check that all path components match required patterns
-
- ts_header_size = 64
- ts_header_format = "!IIIcc"
- ts_arr_tag = 'csv'
- csv_file_encoding = 'ascii'
-
- def __init__(self, storage: Storage) -> None:
- self.storage = storage
- self.cache = {} # type: Dict[str, Tuple[int, int, ArrayData]]
-
- def sync(self) -> None:
- self.storage.sync()
-
- # ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
- def read_headers(self, fd) -> Tuple[str, List[str], List[str], Optional[numpy.ndarray]]:
- header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
- dtype, has_header2, header2_dtype, *ext_header = header
-
- if has_header2 == 'true':
- ln = fd.readline().decode(self.csv_file_encoding).strip()
- header2 = numpy.fromstring(ln, sep=',', dtype=header2_dtype)
- else:
- assert has_header2 == 'false', \
- "In file {} has_header2 is not true/false, but {!r}".format(fd.name, has_header2)
- header2 = None
- return dtype, ext_header, header, header2
-
- def load_array(self, path: str) -> ArrayData:
- """
- Load array from file, shoult not be called directly
- :param path: file path
- :return: ArrayData
- """
- with self.storage.get_fd(path, "rb") as fd:
- fd.seek(0, os.SEEK_SET)
-
- stats = os.fstat(fd.fileno())
- if path in self.cache:
- size, atime, arr_info = self.cache[path]
- if size == stats.st_size and atime == stats.st_atime_ns:
- return arr_info
-
- data_dtype, header, _, header2 = self.read_headers(fd)
- assert data_dtype == 'uint64', path
- dt = fd.read().decode(self.csv_file_encoding).strip()
-
- if len(dt) != 0:
- arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=data_dtype)
- lines = dt.count("\n") + 1
- assert len(set(ln.count(',') for ln in dt.split("\n"))) == 1, \
- "Data lines in {!r} have different element count".format(path)
- arr.shape = [lines] if lines == arr.size else [lines, -1]
- else:
- arr = None
-
- arr_data = ArrayData(header, header2, arr)
- self.cache[path] = (stats.st_size, stats.st_atime_ns, arr_data)
- return arr_data
-
- def put_array(self, path: str, data: numpy.array, header: List[str], header2: numpy.ndarray = None,
- append_on_exists: bool = False) -> None:
-
- header = [data.dtype.name] + \
- (['false', ''] if header2 is None else ['true', header2.dtype.name]) + \
- header
-
- exists = append_on_exists and path in self.storage
- vw = data.view().reshape((data.shape[0], 1)) if len(data.shape) == 1 else data
- mode = "cb" if not exists else "rb+"
-
- with self.storage.get_fd(path, mode) as fd:
- if exists:
- data_dtype, _, full_header, curr_header2 = self.read_headers(fd)
-
- assert data_dtype == data.dtype.name, \
- "Path {!r}. Passed data type ({!r}) and current data type ({!r}) doesn't match"\
- .format(path, data.dtype.name, data_dtype)
-
- assert header == full_header, \
- "Path {!r}. Passed header ({!r}) and current header ({!r}) doesn't match"\
- .format(path, header, full_header)
-
- assert header2 == curr_header2, \
- "Path {!r}. Passed header2 != current header2: {!r}\n{!r}"\
- .format(path, header2, curr_header2)
-
- fd.seek(0, os.SEEK_END)
- else:
- fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
- if header2 is not None:
- fd.write((",".join(map(str, header2)) + "\n").encode(self.csv_file_encoding))
-
- numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
-
- def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
- """
- Load time series, generated by fio or other tool, should not be called directly,
- use iter_ts istead.
- :param ds: data source path
- :param path: path in data storage
- :return: TimeSeries
- """
- (units, time_units), header2, data = self.load_array(path)
- times = data[:,0].copy()
- ts_data = data[:,1:]
-
- if ts_data.shape[1] == 1:
- ts_data.shape = (ts_data.shape[0],)
-
- return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
- raw=None,
- data=ts_data,
- times=times,
- source=ds,
- units=units,
- time_units=time_units,
- histo_bins=header2)
-
- def load_sensor_raw(self, ds: DataSource) -> bytes:
- path = DB_paths.sensor_data.format(**ds.__dict__)
- with self.storage.get_fd(path, "rb") as fd:
- return fd.read()
-
- def load_sensor(self, ds: DataSource) -> TimeSeries:
- # sensors has no shape
- path = DB_paths.sensor_time.format(**ds.__dict__)
- collect_header, must_be_none, collected_at = self.load_array(path)
-
- # cut 'collection end' time
- # .copy needed to really remove 'collection end' element to make c_interpolate_.. works correctly
- collected_at = collected_at[::2].copy()
-
- # there must be no histogram for collected_at
- assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
- node, tp, units = collect_header
- assert node == ds.node_id and tp == 'collected_at' and units in ('ms', 'us'),\
- "Unexpected collect_at header {!r} at {!r}".format(collect_header, path)
- assert len(collected_at.shape) == 1, "Collected_at must be 1D at {!r}".format(path)
-
- data_path = DB_paths.sensor_data.format(**ds.__dict__)
- data_header, must_be_none, data = self.load_array(data_path)
-
- # there must be no histogram for any sensors
- assert must_be_none is None, "Extra header2 {!r} in sensor data file {!r}".format(must_be_none, data_path)
-
- data_units = data_header[2]
- assert data_header == [ds.node_id, ds.metric_fqdn, data_units], \
- "Unexpected data header {!r} at {!r}".format(data_header, data_path)
- assert len(data.shape) == 1, "Sensor data must be 1D at {!r}".format(data_path)
-
- return TimeSeries(ds.metric_fqdn,
- raw=None,
- data=data,
- times=collected_at,
- source=ds,
- units=data_units,
- time_units=units)
-
- # ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
-
- def check_plot_file(self, source: DataSource) -> Optional[str]:
- path = DB_paths.plot.format(**source.__dict__)
- fpath = self.storage.resolve_raw(DB_paths.report_root + path)
- return path if os.path.exists(fpath) else None
-
- # ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
-
- def put_or_check_suite(self, suite: SuiteConfig) -> None:
- path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
- if path in self.storage:
- db_cfg = self.storage.load(SuiteConfig, path)
- if db_cfg != suite:
- logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
- logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite))
- raise StopTestError()
- else:
- self.storage.put(suite, path)
-
- def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
- path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
- self.storage.put(job, path)
-
- def put_ts(self, ts: TimeSeries) -> None:
- assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
- assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
- assert ts.source.tag == self.ts_arr_tag, "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag,
- self.ts_arr_tag)
- csv_path = DB_paths.ts.format(**ts.source.__dict__)
- header = [ts.units, ts.time_units]
-
- tv = ts.times.view().reshape((-1, 1))
- if len(ts.data.shape) == 1:
- dv = ts.data.view().reshape((ts.times.shape[0], -1))
- else:
- dv = ts.data
-
- result = numpy.concatenate((tv, dv), axis=1)
- if ts.histo_bins is not None:
- self.put_array(csv_path, result, header, header2=ts.histo_bins)
- else:
- self.put_array(csv_path, result, header)
-
- if ts.raw:
- raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
- self.storage.put_raw(ts.raw, raw_path)
-
- def put_extra(self, data: bytes, source: DataSource) -> None:
- self.storage.put_raw(data, DB_paths.ts.format(**source.__dict__))
-
- def put_stat(self, data: StatProps, source: DataSource) -> None:
- self.storage.put(data, DB_paths.stat.format(**source.__dict__))
-
- # return path to file to be inserted into report
- def put_plot_file(self, data: bytes, source: DataSource) -> str:
- path = DB_paths.plot.format(**source.__dict__)
- self.storage.put_raw(data, DB_paths.report_root + path)
- return path
-
- def put_report(self, report: str, name: str) -> str:
- return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
-
- def put_sensor_raw(self, data: bytes, ds: DataSource) -> None:
- path = DB_paths.sensor_data.format(**ds.__dict__)
- with self.storage.get_fd(path, "cb") as fd:
- fd.write(data)
-
- def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
- if ds.metric == 'collected_at':
- path = DB_paths.sensor_time
- metrics_fqn = 'collected_at'
- else:
- path = DB_paths.sensor_data
- metrics_fqn = ds.metric_fqdn
-
- if ds.metric == 'lat':
- assert len(data.shape) == 2, "Latency should be histo array"
- assert histo_bins is not None, "Latency should have histo bins"
-
- path = path.format(**ds.__dict__)
- self.put_array(path, data, [ds.node_id, metrics_fqn, units], header2=histo_bins, append_on_exists=True)
-
- # ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
-
- def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
- return self.storage.load(stat_cls, DB_paths.stat.format(**source.__dict__))
-
- # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
-
- def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
- path = path_glob.format(**DB_rr).split("/")
- yield from self.storage._iter_paths("", path, {})
-
- def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
- for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
- assert is_file
- suite = self.storage.load(SuiteConfig, suite_info_path)
- # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
- assert suite.storage_id == groups['suite_id']
- if not suite_type or suite.test_type == suite_type:
- yield suite
-
- def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
- job_glob = fill_path(DB_paths.job_cfg_r, suite_id=suite.storage_id)
- job_config_cls = all_suits[suite.test_type].job_config_cls
- for is_file, path, groups in self.iter_paths(job_glob):
- assert is_file
- job = cast(JobConfig, self.storage.load(job_config_cls, path))
- assert job.storage_id == groups['job_id']
- yield job
-
- # iterate over test tool data
- def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
- filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
- ts_glob = fill_path(DB_paths.ts_r, **filters)
- for is_file, path, groups in self.iter_paths(ts_glob):
- tag = groups["tag"]
- if tag != 'csv':
- continue
- assert is_file
- groups = groups.copy()
- groups.update(filters)
- ds = DataSource(suite_id=suite.storage_id,
- job_id=job.storage_id,
- node_id=groups["node_id"],
- sensor=groups["sensor"],
- dev=None,
- metric=groups["metric"],
- tag=tag)
- yield self.load_ts(ds, path)
-
- def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
- Iterator[Tuple[str, DataSource]]:
- vls = dict(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
- path = fill_path(DB_paths.sensor_data_r, **vls)
- for is_file, path, groups in self.iter_paths(path):
- cvls = vls.copy()
- cvls.update(groups)
- yield path, DataSource(**cvls)
-
- def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
- path = DB_paths.txt_report.format(suite_id=suite.storage_id)
- if path in self.storage:
- return self.storage.get_raw(path).decode('utf8')
-
- def put_txt_report(self, suite: SuiteConfig, report: str) -> None:
- path = DB_paths.txt_report.format(suite_id=suite.storage_id)
- self.storage.put_raw(report.encode('utf8'), path)
-
- def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
- path = DB_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
- self.storage.put(data, path)
-
- def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
- path = DB_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
- return self.storage.get(path, None)
diff --git a/wally/html.py b/wally/html.py
deleted file mode 100644
index a1db5b3..0000000
--- a/wally/html.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from typing import Optional, List, Callable
-
-
-import xmlbuilder3
-
-
-eol = "<br>"
-
-
-def tag(name: str) -> Callable[[str], str]:
- def closure(data: str) -> str:
- return "<{}>{}</{}>".format(name, data, name)
- return closure
-
-
-H3 = tag("H3")
-H2 = tag("H2")
-center = tag("center")
-
-
-def img(link: str) -> str:
- return '<img src="{}">'.format(link)
-
-
-def table(caption: str, headers: Optional[List[str]], data: List[List[str]], align: List[str] = None) -> str:
- doc = xmlbuilder3.XMLBuilder("table",
- **{"class": "table table-bordered table-striped table-condensed table-hover",
- "style": "width: auto;"})
-
- doc.caption.H3.center(caption)
-
- if headers is not None:
- with doc.thead:
- with doc.tr:
- for header in headers:
- doc.th(header)
-
- max_cols = max(len(line) for line in data if not isinstance(line, str))
-
- with doc.tbody:
- for line in data:
- with doc.tr:
- if isinstance(line, str):
- with doc.td(colspan=str(max_cols)):
- doc.center.b(line)
- else:
- if align:
- for vl, col_align in zip(line, align):
- doc.td(vl, align=col_align)
- else:
- for vl in line:
- doc.td(vl)
-
- return xmlbuilder3.tostr(doc).split("\n", 1)[1]
\ No newline at end of file
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 61938ab..bb857f2 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,11 +1,13 @@
import re
import logging
-from typing import Dict, Iterable
+from typing import Dict, Any
import xml.etree.ElementTree as ET
from typing import List, Tuple, cast, Optional
-from . import utils
-from .node_utils import get_os, OSRelease
+from cephlib.istorage import Storable
+from cephlib.common import b2ssize
+
+from .node_utils import get_os
from .node_interfaces import IRPCNode
@@ -17,7 +19,9 @@
return match_res.group(0)
-class HWInfo:
+class HWInfo(Storable):
+ __ignore_fields__ = ['raw_xml']
+
def __init__(self) -> None:
self.hostname = None # type: str
self.cores = [] # type: List[Tuple[str, int]]
@@ -34,14 +38,10 @@
self.ram_size = 0 # type: int
self.sys_name = None # type: str
self.mb = None # type: str
- self.raw = None # type: str
+ self.raw_xml = None # type: Optional[str]
self.storage_controllers = [] # type: List[str]
- def get_hdd_count(self) -> Iterable[int]:
- # SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
- return []
-
def get_summary(self) -> Dict[str, int]:
cores = sum(count for _, count in self.cores)
disks = sum(size for _, size in self.disks_info.values())
@@ -57,8 +57,8 @@
summ = self.get_summary()
summary = "Simmary: {cores} cores, {ram}B RAM, {disk}B storage"
res.append(summary.format(cores=summ['cores'],
- ram=utils.b2ssize(summ['ram']),
- disk=utils.b2ssize(summ['storage'])))
+ ram=b2ssize(summ['ram']),
+ disk=b2ssize(summ['storage'])))
res.append(str(self.sys_name))
if self.mb:
res.append("Motherboard: " + self.mb)
@@ -66,7 +66,7 @@
if not self.ram_size:
res.append("RAM: Failed to get RAM size")
else:
- res.append("RAM " + utils.b2ssize(self.ram_size) + "B")
+ res.append("RAM " + b2ssize(self.ram_size) + "B")
if not self.cores:
res.append("CPU cores: Failed to get CPU info")
@@ -86,7 +86,7 @@
if self.disks_info:
res.append("Storage devices:")
for dev, (model, size) in sorted(self.disks_info.items()):
- ssize = utils.b2ssize(size) + "B"
+ ssize = b2ssize(size) + "B"
res.append(" {0} {1} {2}".format(dev, ssize, model))
else:
res.append("Storage devices's: Failed to get info")
@@ -108,30 +108,19 @@
return str(self.hostname) + ":\n" + "\n".join(" " + i for i in res)
-class CephInfo:
- def __init__(self) -> None:
- pass
-
-
-class SWInfo:
+class SWInfo(Storable):
def __init__(self) -> None:
self.mtab = None # type: str
self.kernel_version = None # type: str
self.libvirt_version = None # type: Optional[str]
self.qemu_version = None # type: Optional[str]
- self.OS_version = None # type: OSRelease
- self.ceph_info = None # type: Optional[CephInfo]
-
-
-def get_ceph_services_info(node: IRPCNode) -> CephInfo:
- # TODO: use ceph-monitoring module
- return CephInfo()
+ self.os_version = None # type: Tuple[str, ...]
def get_sw_info(node: IRPCNode) -> SWInfo:
res = SWInfo()
- res.OS_version = get_os(node)
+ res.os_version = tuple(get_os(node))
res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
@@ -147,16 +136,10 @@
except OSError:
res.qemu_version = None
- for role in ('ceph-osd', 'ceph-mon', 'ceph-mds'):
- if role in node.info.roles:
- res.ceph_info = get_ceph_services_info(node)
- break
-
return res
def get_hw_info(node: IRPCNode) -> Optional[HWInfo]:
-
try:
lshw_out = node.run('sudo lshw -xml 2>/dev/null')
except Exception as exc:
@@ -164,7 +147,7 @@
return None
res = HWInfo()
- res.raw = lshw_out
+ res.raw_xml = lshw_out
lshw_et = ET.fromstring(lshw_out)
try:
diff --git a/wally/logger.py b/wally/logger.py
index d885ea9..23b0f3b 100644
--- a/wally/logger.py
+++ b/wally/logger.py
@@ -1,7 +1,6 @@
-import yaml
import logging
import logging.config
-from typing import Callable, IO, Optional
+from typing import Callable
def color_me(color: int) -> Callable[[str], str]:
diff --git a/wally/main.py b/wally/main.py
index a20ade5..e43f1f9 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -21,8 +21,8 @@
except ImportError:
yaml_load = cast(YLoader, _yaml_load)
-
-import texttable
+from cephlib.texttable import Texttable
+from cephlib.istorage import IStorage
try:
import faulthandler
@@ -30,18 +30,18 @@
faulthandler = None
from cephlib.common import setup_logging
+from cephlib.storage import make_storage
-from . import utils, node
+from . import utils, node, report_profiles, report
from .node_utils import log_nodes_statistic
-from .storage import make_storage, Storage
from .config import Config
from .stage import Stage
from .test_run_class import TestRun
from .ssh import set_ssh_key_passwd
-
+from .result_storage import ResultStorage
# stages
-from .ceph import DiscoverCephStage, FillCephInfoStage
+from .ceph import DiscoverCephStage, CollectCephInfoStage
from .openstack import DiscoverOSStage
from .fuel import DiscoverFuelStage
from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
@@ -123,6 +123,8 @@
report_parser = subparsers.add_parser('report', help=report_help)
report_parser.add_argument('-R', '--reporters', help="Comma-separated list of reportes - html,txt",
default='html,txt')
+ report_parser.add_argument('-f', '--format', help="Images format, default is " + report_profiles.default_format,
+ choices=('svg', 'png'), default=report_profiles.default_format)
report_parser.add_argument("data_dir", help="folder with rest results")
# ---------------------------------------------------------------------
@@ -210,7 +212,7 @@
def get_run_stages() -> List[Stage]:
return [DiscoverCephStage(),
- FillCephInfoStage(),
+ CollectCephInfoStage(),
DiscoverOSStage(),
DiscoverFuelStage(),
ExplicitNodesStage(),
@@ -231,7 +233,7 @@
# stop mypy from telling that config & storage might be undeclared
config = None # type: Config
- storage = None # type: Storage
+ storage = None # type: IStorage
if opts.subparser_name == 'test':
config = load_config(opts.config_file)
@@ -277,7 +279,7 @@
opts.subparser_name = 'resume'
elif opts.subparser_name == 'ls':
- tab = texttable.Texttable(max_width=200)
+ tab = Texttable(max_width=200)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
tab.set_cols_align(["l", "l", "l", "l"])
tab.header(["Name", "Tests", "Run at", "Comment"])
@@ -291,9 +293,10 @@
return 1
storage = make_storage(opts.data_dir, existing=True)
config = storage.load(Config, 'config')
+ report_profiles.default_format = opts.format
+ report.default_format = opts.format
stages.append(LoadStoredNodesStage())
stages.append(SaveNodesStage())
-
elif opts.subparser_name == 'compare':
# x = run_test.load_data_from_path(opts.data_path1)
# y = run_test.load_data_from_path(opts.data_path2)
@@ -314,7 +317,6 @@
return 0
elif opts.subparser_name == 'ipython':
storage = make_storage(opts.storage_dir, existing=True)
- from .hlstorage import ResultStorage
rstorage = ResultStorage(storage=storage)
import IPython
@@ -341,7 +343,7 @@
logger.info("All info would be stored into %r", config.storage_url)
- ctx = TestRun(config, storage)
+ ctx = TestRun(config, storage, ResultStorage(storage))
ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
if opts.ssh_key_passwd is not None:
diff --git a/wally/node.py b/wally/node.py
index 32ec58a..a662f76 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -8,11 +8,9 @@
import subprocess
from typing import Union, cast, Optional, Tuple, Dict
-
from agent import agent
import paramiko
-
from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
from .ssh import connect as ssh_connect
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index b10f83d..703ef71 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -2,8 +2,10 @@
import logging
from typing import Any, Set, Dict, Optional, NamedTuple
+from cephlib.istorage import IStorable
+
from .ssh_utils import ConnCreds
-from .common_types import IPAddr, IStorable
+from .common_types import IPAddr
RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
diff --git a/wally/openstack.py b/wally/openstack.py
index 88189f5..4048207 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -1,7 +1,9 @@
import os.path
import socket
import logging
-from typing import Dict, Any, List, Tuple, cast, Optional
+from typing import Dict, Any, List, Tuple, cast
+
+from cephlib.common import to_ip
from .node_interfaces import NodeInfo
from .config import ConfigBlock, Config
@@ -10,7 +12,7 @@
OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
from .test_run_class import TestRun
from .stage import Stage, StepOrder
-from .utils import LogError, StopTestError, get_creds_openrc, to_ip
+from .utils import LogError, StopTestError, get_creds_openrc
logger = logging.getLogger("wally")
@@ -136,7 +138,10 @@
logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
for host, services in host_services_mapping.items():
- creds = ConnCreds(host=to_ip(host), user=user, passwd=password, key_file=key_file)
+ host_ip = to_ip(host)
+ if host != host_ip:
+ logger.info("Will use ip_addr %r instead of hostname %r", host_ip, host)
+ creds = ConnCreds(host=host_ip, user=user, passwd=password, key_file=key_file)
ctx.merge_node(creds, set(services))
# TODO: log OS nodes discovery results
else:
@@ -221,50 +226,3 @@
ctx.storage.rm('spawned_os_nodes')
logger.info("OS spawned nodes has been successfully removed")
-
-
-
-# @contextlib.contextmanager
-# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
-#
-# pausable_nodes_ids = [cast(int, node.info.os_vm_id)
-# for node in unused_nodes
-# if node.info.os_vm_id is not None]
-#
-# non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-#
-# if non_pausable:
-# logger.warning("Can't pause {} nodes".format(non_pausable))
-#
-# if pausable_nodes_ids:
-# logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
-# with ctx.get_pool() as pool:
-# openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
-#
-# try:
-# yield pausable_nodes_ids
-# finally:
-# if pausable_nodes_ids:
-# logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
-# with ctx.get_pool() as pool:
-# openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
-# def clouds_connect_stage(ctx: TestRun) -> None:
- # TODO(koder): need to use this to connect to openstack in upper code
- # conn = ctx.config['clouds/openstack']
- # user, passwd, tenant = parse_creds(conn['creds'])
- # auth_data = dict(auth_url=conn['auth_url'],
- # username=user,
- # api_key=passwd,
- # project_id=tenant) # type: Dict[str, str]
- # logger.debug("Discovering openstack nodes with connection details: %r", conn)
- # connect to openstack, fuel
-
- # # parse FUEL REST credentials
- # username, tenant_name, password = parse_creds(fuel_data['creds'])
- # creds = {"username": username,
- # "tenant_name": tenant_name,
- # "password": password}
- #
- # # connect to FUEL
- # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
- # pass
\ No newline at end of file
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 0f6c6fc..302d898 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -7,7 +7,7 @@
import tempfile
import subprocess
import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
from concurrent.futures import ThreadPoolExecutor
from keystoneauth1 import loading, session
@@ -16,7 +16,8 @@
from cinderclient.client import Client as CinderClient
from glanceclient import Client as GlanceClient
-from .utils import Timeout, to_ip
+from cephlib.common import Timeout, to_ip
+
from .node_interfaces import NodeInfo
from .ssh_utils import ConnCreds
@@ -453,7 +454,10 @@
user = params['image']['user']
for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
- info = NodeInfo(ConnCreds(to_ip(ip), user, key_file=private_key_path), set())
+ node_ip = to_ip(ip)
+ if ip != node_ip:
+ logger.info("Will use ip_addr %r instead of hostname %r", node_ip, ip)
+ info = NodeInfo(ConnCreds(node_ip, user, key_file=private_key_path), set())
info.os_vm_id = os_node.id
yield info
diff --git a/wally/plot.py b/wally/plot.py
index 6729584..857a594 100644
--- a/wally/plot.py
+++ b/wally/plot.py
@@ -1,15 +1,7 @@
import logging
-from io import BytesIO
-from functools import wraps
-from typing import Tuple, cast, List, Callable, Optional, Any
+from typing import List
import numpy
-import scipy.stats
-import matplotlib.axis
-import matplotlib.style
-from matplotlib.ticker import FuncFormatter
-from matplotlib.figure import Figure
-import matplotlib.pyplot as plt
# to make seaborn styles available
import warnings
@@ -17,387 +9,16 @@
warnings.simplefilter("ignore")
import seaborn
-from cephlib.plot import process_heatmap_data, hmap_from_2d, do_plot_hmap_with_histo
+from cephlib.units import unit_conversion_coef_f
+from cephlib.plot import PlotParams, provide_plot
-from .hlstorage import ResultStorage
-from .utils import unit_conversion_coef
-from .statistic import moving_average, moving_dev, hist_outliers_perc, find_ouliers_ts, approximate_curve
-from .result_classes import StatProps, DataSource, TimeSeries, NormStatProps
-from .report_profiles import StyleProfile, ColorProfile
+from .report_profiles import StyleProfile
from .resources import IOSummary
logger = logging.getLogger("wally")
-# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
-
-def get_emb_image(fig: Figure, file_format: str, **opts) -> bytes:
- bio = BytesIO()
- if file_format == 'svg':
- fig.savefig(bio, format='svg', **opts)
- img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
- return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
- else:
- fig.savefig(bio, format=file_format, **opts)
- return bio.getvalue()
-
-
-class PlotParams:
- def __init__(self, fig: Figure, ax: Any, title: str,
- style: StyleProfile, colors: ColorProfile) -> None:
- self.fig = fig
- self.ax = ax
- self.style = style
- self.colors = colors
- self.title = title
-
-
-def provide_plot(noaxis: bool = False,
- eng: bool = False,
- no_legend: bool = False,
- long_plot: bool = True,
- grid: Any = None,
- style_name: str = 'default',
- noadjust: bool = False) -> Callable[..., Callable[..., str]]:
- def closure1(func: Callable[..., None]) -> Callable[..., str]:
- @wraps(func)
- def closure2(storage: ResultStorage,
- style: StyleProfile,
- colors: ColorProfile,
- path: DataSource,
- title: Optional[str],
- *args, **kwargs) -> str:
- fpath = storage.check_plot_file(path)
- if not fpath:
-
- assert style_name in ('default', 'ioqd')
- mlstyle = style.default_style if style_name == 'default' else style.io_chart_style
- with matplotlib.style.context(mlstyle):
- file_format = path.tag.split(".")[-1]
- fig = plt.figure(figsize=style.figsize_long if long_plot else style.figsize)
-
- if not noaxis:
- xlabel = kwargs.pop('xlabel', None)
- ylabel = kwargs.pop('ylabel', None)
- ax = fig.add_subplot(111)
-
- if xlabel is not None:
- ax.set_xlabel(xlabel)
-
- if ylabel is not None:
- ax.set_ylabel(ylabel)
-
- if grid:
- ax.grid(axis=grid)
- else:
- ax = None
-
- if title:
- fig.suptitle(title, fontsize=style.title_font_size)
-
- pp = PlotParams(fig, ax, title, style, colors)
- func(pp, *args, **kwargs)
- apply_style(pp, eng=eng, no_legend=no_legend, noadjust=noadjust)
-
- fpath = storage.put_plot_file(get_emb_image(fig, file_format=file_format, dpi=style.dpi), path)
- logger.debug("Plot %s saved to %r", path, fpath)
- plt.close(fig)
- return fpath
- return closure2
- return closure1
-
-
-def apply_style(pp: PlotParams, eng: bool = True, no_legend: bool = False, noadjust: bool = False) -> None:
-
- if (pp.style.legend_for_eng or not eng) and not no_legend:
- if not noadjust:
- pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
- legend_location = "center left"
- legend_bbox_to_anchor = (1.03, 0.81)
-
- for ax in pp.fig.axes:
- ax.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
- elif not noadjust:
- pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_legend)
-
- if pp.style.tide_layout:
- pp.fig.set_tight_layout(True)
-
-
-# -------------- PLOT FUNCTIONS --------------------------------------------------------------------------------------
-
-
-@provide_plot(eng=True)
-def plot_hist(pp: PlotParams, units: str, prop: StatProps) -> None:
-
- normed_bins = prop.bins_populations / prop.bins_populations.sum()
- bar_width = prop.bins_edges[1] - prop.bins_edges[0]
- pp.ax.bar(prop.bins_edges, normed_bins, color=pp.colors.box_color, width=bar_width, label="Real data")
-
- pp.ax.set(xlabel=units, ylabel="Value probability")
-
- if isinstance(prop, NormStatProps):
- nprop = cast(NormStatProps, prop)
- stats = scipy.stats.norm(nprop.average, nprop.deviation)
-
- new_edges, step = numpy.linspace(prop.bins_edges[0], prop.bins_edges[-1],
- len(prop.bins_edges) * 10, retstep=True)
-
- ypoints = stats.cdf(new_edges) * 11
- ypoints = [nextpt - prevpt for (nextpt, prevpt) in zip(ypoints[1:], ypoints[:-1])]
- xpoints = (new_edges[1:] + new_edges[:-1]) / 2
-
- pp.ax.plot(xpoints, ypoints, color=pp.colors.primary_color, label="Expected from\nnormal\ndistribution")
-
- pp.ax.set_xlim(left=prop.bins_edges[0])
- if prop.log_bins:
- pp.ax.set_xscale('log')
-
-
-@provide_plot(grid='y')
-def plot_simple_over_time(pp: PlotParams, tss: List[Tuple[str, numpy.ndarray]], average: bool = False) -> None:
- max_len = 0
- for name, arr in tss:
- if average:
- avg_vals = moving_average(arr, pp.style.avg_range)
- if pp.style.approx_average_no_points:
- time_points = numpy.arange(len(avg_vals))
- avg_vals = approximate_curve(cast(List[int], time_points),
- avg_vals,
- cast(List[int], time_points),
- pp.style.curve_approx_level)
- arr = avg_vals
- pp.ax.plot(arr, label=name)
- max_len = max(max_len, len(arr))
- pp.ax.set_xlim(-5, max_len + 5)
-
-
-@provide_plot(no_legend=True, grid='x', noadjust=True)
-def plot_simple_bars(pp: PlotParams,
- names: List[str],
- values: List[float],
- errs: List[float] = None,
- x_formatter: Callable[[float, float], str] = None,
- one_point_zero_line: bool = True) -> None:
-
- ind = numpy.arange(len(names))
- width = 0.35
- pp.ax.barh(ind, values, width, xerr=errs)
-
- pp.ax.set_yticks(ind)
- pp.ax.set_yticklabels(names)
- pp.ax.set_xlim(0, max(val + err for val, err in zip(values, errs)) * 1.1)
-
- if one_point_zero_line:
- pp.ax.axvline(x=1.0, color='r', linestyle='--', linewidth=1, alpha=0.5)
-
- if x_formatter:
- pp.ax.xaxis.set_major_formatter(FuncFormatter(x_formatter))
-
- pp.fig.subplots_adjust(left=0.2)
-
-
-@provide_plot(no_legend=True, long_plot=True, noaxis=True)
-def plot_hmap_from_2d(pp: PlotParams, data2d: numpy.ndarray, xlabel: str, ylabel: str,
- bins: numpy.ndarray = None) -> None:
- ioq1d, ranges = hmap_from_2d(data2d)
- heatmap, bins = process_heatmap_data(ioq1d, bin_ranges=ranges, bins=bins)
- bins_populations, _ = numpy.histogram(ioq1d, bins)
-
- ax, _ = do_plot_hmap_with_histo(pp.fig,
- heatmap,
- bins_populations,
- bins,
- cmap=pp.colors.hmap_cmap,
- cbar=pp.style.heatmap_colorbar,
- histo_grid=pp.style.histo_grid)
- ax.set(ylabel=ylabel, xlabel=xlabel)
-
-
-@provide_plot(eng=True, grid='y')
-def plot_v_over_time(pp: PlotParams, units: str, ts: TimeSeries,
- plot_avg_dev: bool = True, plot_points: bool = True) -> None:
-
- min_time = min(ts.times)
-
- # convert time to ms
- coef = float(unit_conversion_coef(ts.time_units, 's'))
- time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
-
- outliers_idxs = find_ouliers_ts(ts.data, cut_range=pp.style.outliers_q_nd)
- outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=pp.style.outliers_hide_q_nd)
- normal_idxs = numpy.logical_not(outliers_idxs)
- outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
- # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
-
- data = ts.data[normal_idxs]
- data_times = time_points[normal_idxs]
- outliers = ts.data[outliers_idxs]
- outliers_times = time_points[outliers_idxs]
-
- if plot_points:
- alpha = pp.colors.noise_alpha if plot_avg_dev else 1.0
- pp.ax.plot(data_times, data, pp.style.point_shape, color=pp.colors.primary_color, alpha=alpha, label="Data")
- pp.ax.plot(outliers_times, outliers, pp.style.err_point_shape, color=pp.colors.err_color, label="Outliers")
-
- has_negative_dev = False
- plus_minus = "\xb1"
-
- if plot_avg_dev and len(data) < pp.style.avg_range * 2:
- logger.warning("Array %r to small to plot average over %s points", pp.title, pp.style.avg_range)
- elif plot_avg_dev:
- avg_vals = moving_average(data, pp.style.avg_range)
- dev_vals = moving_dev(data, pp.style.avg_range)
- avg_times = moving_average(data_times, pp.style.avg_range)
-
- if (plot_points and pp.style.approx_average) or (not plot_points and pp.style.approx_average_no_points):
- avg_vals = approximate_curve(avg_times, avg_vals, avg_times, pp.style.curve_approx_level)
- dev_vals = approximate_curve(avg_times, dev_vals, avg_times, pp.style.curve_approx_level)
-
- pp.ax.plot(avg_times, avg_vals, c=pp.colors.suppl_color1, label="Average")
-
- low_vals_dev = avg_vals - dev_vals * pp.style.dev_range_x
- hight_vals_dev = avg_vals + dev_vals * pp.style.dev_range_x
- if (pp.style.dev_range_x - int(pp.style.dev_range_x)) < 0.01:
- pp.ax.plot(avg_times, low_vals_dev, c=pp.colors.suppl_color2,
- label="{}{}*stdev".format(plus_minus, int(pp.style.dev_range_x)))
- else:
- pp.ax.plot(avg_times, low_vals_dev, c=pp.colors.suppl_color2,
- label="{}{}*stdev".format(plus_minus, pp.style.dev_range_x))
- pp.ax.plot(avg_times, hight_vals_dev, c=pp.colors.suppl_color2)
- has_negative_dev = low_vals_dev.min() < 0
-
- pp.ax.set_xlim(-5, max(time_points) + 5)
- pp.ax.set_xlabel("Time, seconds from test begin")
-
- if plot_avg_dev:
- pp.ax.set_ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, pp.style.avg_range))
- else:
- pp.ax.set_ylabel(units)
-
- if has_negative_dev:
- pp.ax.set_ylim(bottom=0)
-
-
-@provide_plot(eng=True, no_legend=True, grid='y', noadjust=True)
-def plot_lat_over_time(pp: PlotParams, ts: TimeSeries) -> None:
- times = ts.times - min(ts.times)
- step = len(times) / pp.style.lat_samples
- points = [times[int(i * step + 0.5)] for i in range(pp.style.lat_samples)]
- points.append(times[-1])
- bounds = list(zip(points[:-1], points[1:]))
- agg_data = []
- positions = []
- labels = []
-
- for begin, end in bounds:
- agg_hist = ts.data[begin:end].sum(axis=0)
-
- if pp.style.violin_instead_of_box:
- # cut outliers
- idx1, idx2 = hist_outliers_perc(agg_hist, pp.style.outliers_lat)
- agg_hist = agg_hist[idx1:idx2]
- curr_bins_vals = ts.histo_bins[idx1:idx2]
-
- correct_coef = pp.style.violin_point_count / sum(agg_hist)
- if correct_coef > 1:
- correct_coef = 1
- else:
- curr_bins_vals = ts.histo_bins
- correct_coef = 1
-
- vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
- cidx = 0
-
- non_zero, = agg_hist.nonzero()
- for pos in non_zero:
- count = int(agg_hist[pos] * correct_coef + 0.5)
-
- if count != 0:
- vals[cidx: cidx + count] = curr_bins_vals[pos]
- cidx += count
-
- agg_data.append(vals[:cidx])
- positions.append((end + begin) / 2)
- labels.append(str((end + begin) // 2))
-
- if pp.style.violin_instead_of_box:
- patches = pp.ax.violinplot(agg_data, positions=positions, showmeans=True, showmedians=True, widths=step / 2)
- patches['cmeans'].set_color("blue")
- patches['cmedians'].set_color("green")
- if pp.style.legend_for_eng:
- legend_location = "center left"
- legend_bbox_to_anchor = (1.03, 0.81)
- pp.ax.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
- loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
- else:
- pp.ax.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
-
- pp.ax.set_xlim(min(times), max(times))
- pp.ax.set_xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
- pp.fig.subplots_adjust(right=pp.style.subplot_adjust_r)
-
-
-@provide_plot(eng=True, no_legend=True, noaxis=True, long_plot=True)
-def plot_histo_heatmap(pp: PlotParams, ts: TimeSeries, ylabel: str, xlabel: str = "time, s") -> None:
-
- # only histogram-based ts can be plotted
- assert len(ts.data.shape) == 2
-
- # Find global outliers. As load is expected to be stable during one job
- # outliers range can be detected globally
- total_hist = ts.data.sum(axis=0)
- idx1, idx2 = hist_outliers_perc(total_hist,
- bounds_perc=pp.style.outliers_lat,
- min_bins_left=pp.style.hm_hist_bins_count)
-
- # merge outliers with most close non-outliers cell
- orig_data = ts.data[:, idx1:idx2].copy()
- if idx1 > 0:
- orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
-
- if idx2 < ts.data.shape[1]:
- orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
-
- bins_vals = ts.histo_bins[idx1:idx2]
-
- # rebin over X axis
- # aggregate some lines in ts.data to plot ~style.hm_x_slots x bins
- agg_idx = float(len(orig_data)) / pp.style.hm_x_slots
- if agg_idx >= 2:
- idxs = list(map(int, numpy.round(numpy.arange(0, len(orig_data) + 1, agg_idx))))
- assert len(idxs) > 1
- data = numpy.empty([len(idxs) - 1, orig_data.shape[1]], dtype=numpy.float32) # type: List[numpy.ndarray]
- for idx, (sidx, eidx) in enumerate(zip(idxs[:-1], idxs[1:])):
- data[idx] = orig_data[sidx:eidx,:].sum(axis=0) / (eidx - sidx)
- else:
- data = orig_data
-
- # rebin over Y axis
- # =================
-
- # don't using rebin_histogram here, as we need apply same bins for many arrays
- step = (bins_vals[-1] - bins_vals[0]) / pp.style.hm_hist_bins_count
- new_bins_edges = numpy.arange(pp.style.hm_hist_bins_count) * step + bins_vals[0]
- bin_mapping = numpy.clip(numpy.searchsorted(new_bins_edges, bins_vals) - 1, 0, len(new_bins_edges) - 1)
-
- # map origin bins ranges to heatmap bins, iterate over rows
- cmap = []
- for line in data:
- curr_bins = [0] * pp.style.hm_hist_bins_count
- for idx, count in zip(bin_mapping, line):
- curr_bins[idx] += count
- cmap.append(curr_bins)
- ncmap = numpy.array(cmap)
-
- histo = ncmap.sum(axis=0).reshape((-1,))
- ax, _ = do_plot_hmap_with_histo(pp.fig, ncmap, histo, new_bins_edges,
- cmap=pp.colors.hmap_cmap,
- cbar=pp.style.heatmap_colorbar, avg_labels=True)
- ax.set(ylabel=ylabel, xlabel=xlabel)
-
-
@provide_plot(eng=False, no_legend=True, grid='y', style_name='ioqd', noadjust=True)
def io_chart(pp: PlotParams,
legend: str,
@@ -430,8 +51,8 @@
block_size = iosums[0].block_size
xpos = numpy.arange(1, len(iosums) + 1, dtype='uint')
- coef_mb = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
- coef_iops = float(unit_conversion_coef(iosums[0].bw.units, "KiBps")) / block_size
+ coef_mb = unit_conversion_coef_f(iosums[0].bw.units, "MiBps")
+ coef_iops = unit_conversion_coef_f(iosums[0].bw.units, "KiBps") / block_size
iops_primary = block_size < pp.style.large_blocks
@@ -476,7 +97,7 @@
ax2 = pp.ax.twinx()
# plot median and 95 perc latency
- lat_coef_ms = float(unit_conversion_coef(iosums[0].lat.units, "ms"))
+ lat_coef_ms = unit_conversion_coef_f(iosums[0].lat.units, "ms")
ax2.plot(xpos, [iosum.lat.perc_50 * lat_coef_ms for iosum in iosums], label="lat med")
ax2.plot(xpos, [iosum.lat.perc_95 * lat_coef_ms for iosum in iosums], label="lat 95%")
diff --git a/wally/process_results.py b/wally/process_results.py
deleted file mode 100644
index b2ed783..0000000
--- a/wally/process_results.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# put all result preprocessing here
-# # selection, aggregation
-#
-# from io import BytesIO
-# import logging
-# from typing import Any
-#
-# from .stage import Stage, StepOrder
-# from .test_run_class import TestRun
-# from .statistic import calc_norm_stat_props, calc_histo_stat_props
-# from .result_classes import StatProps, DataSource, TimeSeries
-# from .hlstorage import ResultStorage
-# from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
-# from .suits.io.fio import FioTest
-# from .utils import StopTestError
-#
-# import matplotlib
-# matplotlib.use('svg')
-# import matplotlib.pyplot as plt
-#
-#
-# logger = logging.getLogger("wally")
-#
-#
-# class CalcStatisticStage(Stage):
-# priority = StepOrder.TEST + 1
-#
-# def run(self, ctx: TestRun) -> None:
-# rstorage = ResultStorage(ctx.storage)
-#
-# for suite in rstorage.iter_suite(FioTest.name):
-# for job in rstorage.iter_job(suite):
-# for ts in rstorage.iter_ts(suite, job):
-# if ts.source.sensor == 'lat':
-# if ts.data.shape[1] != expected_lat_bins:
-# logger.error("Sensor %s.%s on node %s has" +
-# "shape=%s. Can only process sensors with shape=[X,%s].",
-# ts.source.dev, ts.source.sensor, ts.source.node_id,
-# ts.data.shape, expected_lat_bins)
-# continue
-#
-# ts.bins_edges = get_lat_vals(ts.data.shape[1])
-# stat_prop = calc_histo_stat_props(ts) # type: StatProps
-#
-# elif len(ts.data.shape) != 1:
-# logger.warning("Sensor %s.%s on node %s provide 2+D data. Can't process it.",
-# ts.source.dev, ts.source.sensor, ts.source.node_id)
-# continue
-# else:
-# stat_prop = calc_norm_stat_props(ts)
-#
-# raise StopTestError()
diff --git a/wally/report.py b/wally/report.py
index 3ac1292..d37ac60 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -11,17 +11,21 @@
import wally
-from . import html
+from cephlib import html
+from cephlib.units import b2ssize, b2ssize_10, unit_conversion_coef, unit_conversion_coef_f
+from cephlib.statistic import calc_norm_stat_props
+from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
+from cephlib.wally_storage import find_nodes_by_roles
+
+from .utils import STORAGE_ROLES
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .hlstorage import ResultStorage
-from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
-from .statistic import calc_norm_stat_props
+from .result_classes import IResultStorage
from .result_classes import DataSource, TimeSeries, SuiteConfig
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
from .suits.job import JobConfig
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import get_aggregated, AGG_TAG
from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
default_format, io_chart_format)
from .plot import (io_chart, plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
@@ -139,7 +143,7 @@
# -------------------- REPORTS --------------------------------------------------------------------------------------
class ReporterBase:
- def __init__(self, rstorage: ResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
+ def __init__(self, rstorage: IResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
self.style = style
self.colors = colors
self.rstorage = rstorage
@@ -223,7 +227,7 @@
def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
qd_grouped_jobs = {} # type: Dict[FioJobParams, List[FioJobConfig]]
- test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+ test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
for job in self.rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
if fjob.bsize != 4:
@@ -260,7 +264,7 @@
tag=io_chart_format)
fpath = self.plt(plot_simple_bars, ds, jc_no_qd.long_summary, labels, vals, errs,
- xlabel="CPU time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
+ xlabel="CPU core time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
x_formatter=(lambda x, pos: b2ssize_10(x) + 's'),
one_point_zero_line=False)
@@ -277,7 +281,7 @@
io_sum = make_iosum(self.rstorage, suite, fjob, self.style.hist_boxes)
caption = "Test summary - " + job.params.long_summary
- test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+ test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
if test_nc > 1:
caption += " * {} nodes".format(test_nc)
@@ -297,7 +301,7 @@
bw_units = "B"
bw_target_units = bw_units + 'ps'
- bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
+ bw_coef = unit_conversion_coef_f(io_sum.bw.units, bw_target_units)
adf_v, *_1, stats, _2 = adfuller(io_sum.bw.data)
@@ -323,7 +327,7 @@
stat_data = [bw_data]
if fjob.bsize < StyleProfile.large_blocks:
- iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
+ iops_coef = unit_conversion_coef_f(io_sum.bw.units, 'KiBps') / fjob.bsize
iops_data = ["IOPS",
b2ssize_10(io_sum.bw.data.sum() * iops_coef),
"{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average * iops_coef),
@@ -337,7 +341,7 @@
ad_test]
lat_target_unit = 's'
- lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
+ lat_coef = unit_conversion_coef_f(io_sum.lat.units, lat_target_unit)
# latency
lat_data = ["Latency",
"-",
@@ -416,8 +420,8 @@
for name in records.keys()
}
- short_name[ResourceNames.storage_cpu_s] = "CPU (s/IOP)"
- short_name[ResourceNames.storage_cpu_s_b] = "CPU (s/B)"
+ short_name[ResourceNames.storage_cpu_s] = "CPU core (s/IOP)"
+ short_name[ResourceNames.storage_cpu_s_b] = "CPU core (s/B)"
with doc.tbody:
with doc.tr:
@@ -531,24 +535,21 @@
def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- nodes = list(find_nodes_by_roles(self.rstorage, STORAGE_ROLES))
+ nodes = list(find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES))
sensor = 'block-io'
metric = 'io_queue'
bn_val = 16
- for node in nodes:
+ for node_id in nodes:
bn = 0
tot = 0
- for _, ds in self.rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+ for _, ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
- data = self.rstorage.load_sensor(ds)
- p1 = job.reliable_info_range_s[0] * unit_conversion_coef('s', data.time_units)
- p2 = job.reliable_info_range_s[1] * unit_conversion_coef('s', data.time_units)
- idx1, idx2 = numpy.searchsorted(data.times, (p1, p2))
- bn += (data.data[idx1: idx2] > bn_val).sum()
- tot += idx2 - idx1
- print(node, bn, tot)
+ ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
+ bn += (ts.data > bn_val).sum()
+ tot += len(ts.data)
+ print(node_id, bn, tot)
yield Menu1st.per_job, job.summary, HTMLBlock("")
@@ -586,7 +587,7 @@
storage_devs = None
test_nodes_devs = ['rbd0']
- for node in find_nodes_by_roles(self.rstorage, STORAGE_ROLES):
+ for node in self.rstorage.find_nodes(STORAGE_ROLES):
cjd = set(node.params['ceph_journal_devs'])
if journal_devs is None:
journal_devs = cjd
@@ -609,8 +610,9 @@
HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
# QD heatmap
- ioq2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='io_queue', time_range=trange)
+ nodes = find_nodes_by_roles(self.rstorage.storage, roles)
+ ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', devs=devs,
+ node_id=nodes, metric='io_queue', )
ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
@@ -619,11 +621,11 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# Block size heatmap
- wc2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='writes_completed', time_range=trange)
+ wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ metric='writes_completed')
wc2d[wc2d < 1E-3] = 1
- sw2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='sectors_written', time_range=trange)
+ sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ metric='sectors_written')
data2d = sw2d / wc2d / 1024
fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
data2d=data2d, title=name.capitalize() + " write block size",
@@ -631,8 +633,8 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# iotime heatmap
- wtime2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='io_time', time_range=trange)
+ wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ metric='io_time')
fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
xlabel='Time', ylabel="IO time (ms) per second",
title=name.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
@@ -650,24 +652,25 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
- agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
+ agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
if fjob.bsize >= DefStyleProfile.large_blocks:
title = "Fio measured Bandwidth over time"
units = "MiBps"
- agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+ agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
else:
title = "Fio measured IOPS over time"
- agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+ agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
fpath = self.plt(plot_v_over_time, agg_io.source(tag='ts.' + default_format), title, units, agg_io)
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
if fjob.bsize < DefStyleProfile.large_blocks:
- agg_lat = get_aggregated(self.rstorage, suite, fjob, "lat").copy()
+ agg_lat = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "lat",
+ job.reliable_info_range_s)
TARGET_UNITS = 'ms'
- coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
- agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+ coef = unit_conversion_coef_f(agg_lat.units, TARGET_UNITS)
+ agg_lat.histo_bins = agg_lat.histo_bins.copy() * coef
agg_lat.units = TARGET_UNITS
fpath = self.plt(plot_lat_over_time, agg_lat.source(tag='ts.' + default_format), "Latency", agg_lat,
@@ -681,15 +684,15 @@
fjob = cast(FioJobConfig, job)
- agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
+ agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
if fjob.bsize >= DefStyleProfile.large_blocks:
title = "BW distribution"
units = "MiBps"
- agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+ agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
else:
title = "IOPS distribution"
- agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+ agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
@@ -716,34 +719,34 @@
sensors = []
max_iop = 0
max_bytes = 0
-
+ stor_nodes = find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES)
for sensor, metric, op, units in self.storage_sensors:
- ts = summ_sensors(self.rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
- ds = DataSource(suite_id=suite.storage_id,
- job_id=job.storage_id,
- node_id="storage",
- sensor=sensor,
- dev=AGG_TAG,
- metric=metric,
- tag="ts." + default_format)
+ ts = summ_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor,
+ metric=metric)
+ if ts is not None:
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id="storage",
+ sensor=sensor,
+ dev=AGG_TAG,
+ metric=metric,
+ tag="ts." + default_format)
- data = ts.data if units != 'MiB' else ts.data * float(unit_conversion_coef(ts.units, 'MiB'))
- ts = TimeSeries(name="",
- times=numpy.arange(*job.reliable_info_range_s),
- data=data,
- raw=None,
- units=units if ts.units is None else ts.units,
- time_units=ts.time_units,
- source=ds,
- histo_bins=ts.histo_bins)
+ data = ts.data if units != 'MiB' else ts.data * unit_conversion_coef_f(ts.units, 'MiB')
+ ts = TimeSeries(times=numpy.arange(*job.reliable_info_range_s),
+ data=data,
+ units=units if ts.units is None else ts.units,
+ time_units=ts.time_units,
+ source=ds,
+ histo_bins=ts.histo_bins)
- sensors.append(("{} {}".format(op, units), ds, ts, units))
+ sensors.append(("{} {}".format(op, units), ds, ts, units))
- if units == 'iop':
- max_iop = max(max_iop, data.sum())
- else:
- assert units == 'MiB'
- max_bytes = max(max_bytes, data.sum())
+ if units == 'iop':
+ max_iop = max(max_iop, data.sum())
+ else:
+ assert units == 'MiB'
+ max_bytes = max(max_bytes, data.sum())
for title, ds, ts, units in sensors:
if ts.data.sum() >= (max_iop if units == 'iop' else max_bytes) * DefStyleProfile.min_load_diff:
@@ -761,13 +764,11 @@
priority = StepOrder.REPORT
def run(self, ctx: TestRun) -> None:
- rstorage = ResultStorage(ctx.storage)
-
job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
- job_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
+ job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
suite_reporters_cls = [IOQD, ResourceQD]
- suite_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
+ suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -788,8 +789,8 @@
job_summ_sort_order = []
# TODO: filter reporters
- for suite in rstorage.iter_suite(FioTest.name):
- all_jobs = list(rstorage.iter_job(suite))
+ for suite in ctx.rstorage.iter_suite(FioTest.name):
+ all_jobs = list(ctx.rstorage.iter_job(suite))
all_jobs.sort(key=lambda job: job.params)
new_jobs_in_order = [job.summary for job in all_jobs]
@@ -797,7 +798,7 @@
assert not same, "Job with same names in different suits found: " + ",".join(same)
job_summ_sort_order.extend(new_jobs_in_order)
- for job in all_jobs[:1]:
+ for job in all_jobs:
try:
for reporter in job_reporters:
logger.debug("Start reporter %s on job %s suite %s",
@@ -845,6 +846,6 @@
report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
- report_path = rstorage.put_report(report, "index.html")
- rstorage.put_report(css_file, "main.css")
+ report_path = ctx.rstorage.put_report(report, "index.html")
+ ctx.rstorage.put_report(css_file, "main.css")
logger.info("Report is stored into %r", report_path)
diff --git a/wally/resources.py b/wally/resources.py
index f063eb5..d867a1a 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -1,15 +1,24 @@
+import logging
from typing import Tuple, Dict, cast, List
import numpy
-from .hlstorage import ResultStorage
-from .utils import b2ssize_10, b2ssize, unit_conversion_coef, STORAGE_ROLES
-from .result_classes import SuiteConfig
+
+from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
+from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
+from cephlib.numeric_types import TimeSeries
+from cephlib.wally_storage import find_nodes_by_roles
+from cephlib.storage_selectors import summ_sensors
+
+from .result_classes import IResultStorage, SuiteConfig
+from .utils import STORAGE_ROLES
from .suits.io.fio import FioJobConfig
from .suits.job import JobConfig
-from .result_classes import NormStatProps, HistoStatProps, TimeSeries
-from .statistic import calc_norm_stat_props, calc_histo_stat_props
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import get_aggregated
+from .hw_info import HWInfo
+
+
+logger = logging.getLogger('wally')
class IOSummary:
@@ -76,15 +85,15 @@
iosum_cache = {} # type: Dict[Tuple[str, str]]
-def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
+def make_iosum(rstorage: IResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
nc: bool = False) -> IOSummary:
key = (suite.storage_id, job.storage_id)
if not nc and key in iosum_cache:
return iosum_cache[key]
- lat = get_aggregated(rstorage, suite, job, "lat")
- io = get_aggregated(rstorage, suite, job, "bw")
+ lat = get_aggregated(rstorage, suite.storage_id, job.storage_id, "lat", job.reliable_info_range_s)
+ io = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
res = IOSummary(job.qd,
nodes_count=len(suite.nodes_ids),
@@ -101,7 +110,7 @@
cpu_load_cache = {} # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
-def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
+def get_cluster_cpu_load(rstorage: IResultStorage, roles: List[str],
time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
key = (id(rstorage), tuple(roles), time_range)
@@ -110,8 +119,9 @@
cpu_ts = {}
cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
+ nodes = find_nodes_by_roles(rstorage.storage, roles)
for name in cpu_metrics:
- cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
+ cpu_ts[name] = summ_sensors(rstorage, time_range, nodes=nodes, sensor='system-cpu', metric=name)
it = iter(cpu_ts.values())
total_over_time = next(it).data.copy() # type: numpy.ndarray
@@ -131,7 +141,7 @@
def get_resources_usage(suite: SuiteConfig,
job: JobConfig,
- rstorage: ResultStorage,
+ rstorage: IResultStorage,
large_block: int = 256,
hist_boxes: int = 10,
nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
@@ -148,7 +158,7 @@
io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
- tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
+ tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
io_transfered = io_sum.bw.data * tot_io_coef
records = {
@@ -156,7 +166,7 @@
} # type: Dict[str, Tuple[str, float, float]]
if iops_ok:
- ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
+ ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
else:
ops_done = None
@@ -189,13 +199,14 @@
if service_provided_count is None:
continue
- res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
+ nodes = find_nodes_by_roles(rstorage.storage, roles)
+ res_ts = summ_sensors(rstorage, job.reliable_info_range_s, nodes=nodes, sensor=sensor, metric=metric)
if res_ts is None:
continue
data = res_ts.data
if units == "B":
- data = data * float(unit_conversion_coef(res_ts.units, "B"))
+ data = data * unit_conversion_coef_f(res_ts.units, "B")
avg, dev = avg_dev_div(data, service_provided_count)
if avg < 0.1:
@@ -204,10 +215,20 @@
all_agg[vname] = data
# cpu usage
- nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
- cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+ stor_cores_count = 0
+ all_stor_nodes = list(find_nodes_by_roles(rstorage.storage, STORAGE_ROLES))
+ for node in all_stor_nodes:
+ try:
+ node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node.node_id)
+ except KeyError:
+ logger.warning("No hw_info available for node %s. Using 'NODE time' instead of " +
+ "CPU core time for CPU consumption metrics")
+ stor_cores_count = len(all_stor_nodes)
+ break
+ stor_cores_count += sum(cores for _, cores in node_hw_info.cores)
- cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
+ cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+ cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
all_agg[ResourceNames.storage_cpu] = cpus_used_sec
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 60a0a55..207ba71 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,15 +1,12 @@
import abc
-import copy
-from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator, NamedTuple
+from typing import Dict, List, Any, Tuple, cast, Type, Iterator, Union
-
-import numpy
-from scipy.stats.mstats_basic import NormaltestResult
+from cephlib.numeric_types import TimeSeries, DataSource
+from cephlib.statistic import StatProps
+from cephlib.istorage import IImagesStorage, Storable, ISensorStorage
from .suits.job import JobConfig
-from .node_interfaces import IRPCNode
-from .common_types import Storable
-from .utils import round_digits, Number
+from .node_interfaces import IRPCNode, NodeInfo
class SuiteConfig(Storable):
@@ -56,206 +53,8 @@
set(self.nodes_ids) == set(other.nodes_ids))
-class DataSource:
- def __init__(self,
- suite_id: str = None,
- job_id: str = None,
- node_id: str = None,
- sensor: str = None,
- dev: str = None,
- metric: str = None,
- tag: str = None) -> None:
- self.suite_id = suite_id
- self.job_id = job_id
- self.node_id = node_id
- self.sensor = sensor
- self.dev = dev
- self.metric = metric
- self.tag = tag
-
- @property
- def metric_fqdn(self) -> str:
- return "{0.sensor}.{0.dev}.{0.metric}".format(self)
-
- def __call__(self, **kwargs) -> 'DataSource':
- dct = self.__dict__.copy()
- dct.update(kwargs)
- return self.__class__(**dct)
-
- def __str__(self) -> str:
- return ("suite={0.suite_id},job={0.job_id},node={0.node_id}," +
- "path={0.sensor}.{0.dev}.{0.metric},tag={0.tag}").format(self)
-
- def __repr__(self) -> str:
- return str(self)
-
- @property
- def tpl(self) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str],
- Optional[str], Optional[str], Optional[str]]:
- return self.suite_id, self.job_id, self.node_id, self.sensor, self.dev, self.metric, self.tag
-
- def __eq__(self, o: object) -> bool:
- return self.tpl == cast(DataSource, o).tpl
-
- def __hash__(self) -> int:
- return hash(self.tpl)
-
-
-class TimeSeries:
- """Data series from sensor - either system sensor or from load generator tool (e.g. fio)"""
-
- def __init__(self,
- name: str,
- raw: Optional[bytes],
- data: numpy.ndarray,
- times: numpy.ndarray,
- units: str,
- source: DataSource,
- time_units: str = 'us',
- raw_tag: str = 'txt',
- histo_bins: numpy.ndarray = None) -> None:
-
- # Sensor name. Typically DEV_NAME.METRIC
- self.name = name
-
- # units for data
- self.units = units
-
- # units for time
- self.time_units = time_units
-
- # Time series times and values. Time in ms from Unix epoch.
- self.times = times
- self.data = data
-
- # Raw sensor data (is provided). Like log file for fio iops/bw/lat.
- self.raw = raw
- self.raw_tag = raw_tag
- self.source = source
- self.histo_bins = histo_bins
-
- def __str__(self) -> str:
- res = "TS({}):\n".format(self.name)
- res += " source={}\n".format(self.source)
- res += " times_size={}\n".format(len(self.times))
- res += " data_shape={}\n".format(*self.data.shape)
- return res
-
- def __repr__(self) -> str:
- return str(self)
-
- def copy(self, no_data: bool = False) -> 'TimeSeries':
- cp = copy.copy(self)
-
- if not no_data:
- cp.times = self.times.copy()
- cp.data = self.data.copy()
-
- cp.source = self.source()
- return cp
-
-
# (node_name, source_dev, metric_name) => metric_results
JobMetrics = Dict[Tuple[str, str, str], TimeSeries]
-
-
-class StatProps(Storable):
- "Statistic properties for timeseries with unknown data distribution"
-
- __ignore_fields__ = ['data']
-
- def __init__(self, data: numpy.array, units: str) -> None:
- self.perc_99 = None # type: float
- self.perc_95 = None # type: float
- self.perc_90 = None # type: float
- self.perc_50 = None # type: float
- self.perc_10 = None # type: float
- self.perc_5 = None # type: float
- self.perc_1 = None # type: float
-
- self.min = None # type: Number
- self.max = None # type: Number
-
- # bin_center: bin_count
- self.log_bins = False
- self.bins_populations = None # type: numpy.array
-
- # bin edges, one more element that in bins_populations
- self.bins_edges = None # type: numpy.array
-
- self.data = data
- self.units = units
-
- def __str__(self) -> str:
- res = ["{}(size = {}):".format(self.__class__.__name__, len(self.data))]
- for name in ["perc_1", "perc_5", "perc_10", "perc_50", "perc_90", "perc_95", "perc_99"]:
- res.append(" {} = {}".format(name, round_digits(getattr(self, name))))
- res.append(" range {} {}".format(round_digits(self.min), round_digits(self.max)))
- return "\n".join(res)
-
- def __repr__(self) -> str:
- return str(self)
-
- def raw(self) -> Dict[str, Any]:
- data = super().raw()
- data['bins_mids'] = list(data['bins_mids'])
- data['bins_populations'] = list(data['bins_populations'])
- return data
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
- data['bins_mids'] = numpy.array(data['bins_mids'])
- data['bins_populations'] = numpy.array(data['bins_populations'])
- return cast(StatProps, super().fromraw(data))
-
-
-class HistoStatProps(StatProps):
- """Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
- Used for latency"""
- def __init__(self, data: numpy.array, units: str) -> None:
- StatProps.__init__(self, data, units)
-
-
-class NormStatProps(StatProps):
- "Statistic properties for timeseries with normal data distribution. Used for iops/bw"
- def __init__(self, data: numpy.array, units: str) -> None:
- StatProps.__init__(self, data, units)
-
- self.average = None # type: float
- self.deviation = None # type: float
- self.confidence = None # type: float
- self.confidence_level = None # type: float
- self.normtest = None # type: NormaltestResult
- self.skew = None # type: float
- self.kurt = None # type: float
-
- def __str__(self) -> str:
- res = ["NormStatProps(size = {}):".format(len(self.data)),
- " distr = {} ~ {}".format(round_digits(self.average), round_digits(self.deviation)),
- " confidence({0.confidence_level}) = {1}".format(self, round_digits(self.confidence)),
- " perc_1 = {}".format(round_digits(self.perc_1)),
- " perc_5 = {}".format(round_digits(self.perc_5)),
- " perc_10 = {}".format(round_digits(self.perc_10)),
- " perc_50 = {}".format(round_digits(self.perc_50)),
- " perc_90 = {}".format(round_digits(self.perc_90)),
- " perc_95 = {}".format(round_digits(self.perc_95)),
- " perc_99 = {}".format(round_digits(self.perc_99)),
- " range {} {}".format(round_digits(self.min), round_digits(self.max)),
- " normtest = {0.normtest}".format(self),
- " skew ~ kurt = {0.skew} ~ {0.kurt}".format(self)]
- return "\n".join(res)
-
- def raw(self) -> Dict[str, Any]:
- data = super().raw()
- data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
- return data
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'NormStatProps':
- data['normtest'] = NormaltestResult(*data['normtest'])
- return cast(NormStatProps, super().fromraw(data))
-
-
JobStatMetrics = Dict[Tuple[str, str, str], StatProps]
@@ -273,29 +72,7 @@
self.processed = None # type: JobStatMetrics
-ArrayData = NamedTuple("ArrayData",
- [('header', List[str]),
- ('histo_bins', Optional[numpy.ndarray]),
- ('data', Optional[numpy.ndarray])])
-
-
-class IResultStorage(metaclass=abc.ABCMeta):
-
- @abc.abstractmethod
- def sync(self) -> None:
- pass
-
- @abc.abstractmethod
- def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
- pass
-
- @abc.abstractmethod
- def load_sensor(self, ds: DataSource) -> TimeSeries:
- pass
-
- @abc.abstractmethod
- def iter_sensors(self, ds: DataSource) -> Iterator[TimeSeries]:
- pass
+class IResultStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
@abc.abstractmethod
def put_or_check_suite(self, suite: SuiteConfig) -> None:
@@ -306,10 +83,6 @@
pass
@abc.abstractmethod
- def put_ts(self, ts: TimeSeries) -> None:
- pass
-
- @abc.abstractmethod
def put_extra(self, data: bytes, source: DataSource) -> None:
pass
@@ -329,11 +102,31 @@
def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
pass
- @abc.abstractmethod
- def iter_ts(self, suite: SuiteConfig, job: JobConfig) -> Iterator[TimeSeries]:
- pass
-
# return path to file to be inserted into report
@abc.abstractmethod
def put_plot_file(self, data: bytes, source: DataSource) -> str:
pass
+
+ @abc.abstractmethod
+ def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
+ pass
+
+ @abc.abstractmethod
+ def get_ts(self, ds: DataSource) -> TimeSeries:
+ pass
+
+ @abc.abstractmethod
+ def put_ts(self, ts: TimeSeries) -> None:
+ pass
+
+ @abc.abstractmethod
+ def iter_ts(self, **ds_parts) -> Iterator[DataSource]:
+ pass
+
+ @abc.abstractmethod
+ def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
+ pass
+
+ @abc.abstractmethod
+ def find_nodes(self, roles: Union[str, List[str]]) -> List[NodeInfo]:
+ pass
\ No newline at end of file
diff --git a/wally/result_storage.py b/wally/result_storage.py
new file mode 100644
index 0000000..282bdb5
--- /dev/null
+++ b/wally/result_storage.py
@@ -0,0 +1,150 @@
+import os
+import pprint
+import logging
+from typing import cast, Iterator, Tuple, Type, Optional, Any
+
+import numpy
+
+from cephlib.wally_storage import WallyDB
+from cephlib.hlstorage import SensorStorageBase
+from cephlib.statistic import StatProps
+from cephlib.numeric_types import DataSource, TimeSeries
+
+from .suits.job import JobConfig
+from .result_classes import SuiteConfig, IResultStorage
+from .utils import StopTestError
+from .suits.all_suits import all_suits
+
+from cephlib.storage import Storage
+
+logger = logging.getLogger('wally')
+
+
+def fill_path(path: str, **params) -> str:
+ for name, val in params.items():
+ if val is not None:
+ path = path.replace("{" + name + "}", val)
+ return path
+
+
+class ResultStorage(IResultStorage, SensorStorageBase):
+ def __init__(self, storage: Storage) -> None:
+ SensorStorageBase.__init__(self, storage, WallyDB)
+
+ # ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
+ def check_plot_file(self, source: DataSource) -> Optional[str]:
+ path = self.db_paths.plot.format(**source.__dict__)
+ fpath = self.storage.get_fname(self.db_paths.report_root + path)
+ return path if os.path.exists(fpath) else None
+
+ # ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
+ def put_or_check_suite(self, suite: SuiteConfig) -> None:
+ path = self.db_paths.suite_cfg.format(suite_id=suite.storage_id)
+ if path in self.storage:
+ db_cfg = self.storage.load(SuiteConfig, path)
+ if db_cfg != suite:
+ logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
+ logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite))
+ raise StopTestError()
+ else:
+ self.storage.put(suite, path)
+
+ def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
+ path = self.db_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
+ self.storage.put(job, path)
+
+ def put_extra(self, data: bytes, source: DataSource) -> None:
+ self.storage.put_raw(data, self.db_paths.ts.format(**source.__dict__))
+
+ def put_stat(self, data: StatProps, source: DataSource) -> None:
+ self.storage.put(data, self.db_paths.stat.format(**source.__dict__))
+
+ # return path to file to be inserted into report
+ def put_plot_file(self, data: bytes, source: DataSource) -> str:
+ path = self.db_paths.plot.format(**source.__dict__)
+ self.storage.put_raw(data, self.db_paths.report_root + path)
+ return path
+
+ def put_report(self, report: str, name: str) -> str:
+ return self.storage.put_raw(report.encode(self.csv_file_encoding), self.db_paths.report_root + name)
+
+ def put_txt_report(self, suite: SuiteConfig, report: str) -> None:
+ path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
+ self.storage.put_raw(report.encode('utf8'), path)
+
+ def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
+ path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+ self.storage.put(data, path)
+
+ # ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
+
+ def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+ return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__))
+
+
+ def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
+ path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
+ if path in self.storage:
+ return self.storage.get_raw(path).decode('utf8')
+
+ def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
+ path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+ return self.storage.get(path, None)
+ # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
+
+ def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
+ for is_file, suite_info_path, groups in self.iter_paths(self.db_paths.suite_cfg_r):
+ assert is_file
+ suite = self.storage.load(SuiteConfig, suite_info_path)
+ # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
+ assert suite.storage_id == groups['suite_id']
+ if not suite_type or suite.test_type == suite_type:
+ yield suite
+
+ def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
+ job_glob = fill_path(self.db_paths.job_cfg_r, suite_id=suite.storage_id)
+ job_config_cls = all_suits[suite.test_type].job_config_cls
+ for is_file, path, groups in self.iter_paths(job_glob):
+ assert is_file
+ job = cast(JobConfig, self.storage.load(job_config_cls, path))
+ assert job.storage_id == groups['job_id']
+ yield job
+
+ # ----------------- TS ------------------------------------------------------------------------------------------
+ def get_ts(self, ds: DataSource) -> TimeSeries:
+ path = self.db_paths.ts.format_map(ds.__dict__)
+ (units, time_units), header2, content = self.storage.get_array(path)
+
+ times = content[:,0].copy()
+ data = content[:,1:]
+
+ if data.shape[1] == 1:
+ data.shape = (data.shape[0],)
+
+ return TimeSeries(data=data, times=times, source=ds, units=units, time_units=time_units, histo_bins=header2)
+
+ def put_ts(self, ts: TimeSeries) -> None:
+ assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
+ assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
+ assert ts.source.tag == self.ts_arr_tag, \
+ "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag, self.ts_arr_tag)
+
+ if ts.source.metric == 'lat':
+ assert len(ts.data.shape) == 2, "Latency should be 2d array"
+ assert ts.histo_bins is not None, "Latency should have histo_bins field not empty"
+
+ csv_path = self.db_paths.ts.format_map(ts.source.__dict__)
+ header = [ts.units, ts.time_units]
+
+ tv = ts.times.view().reshape((-1, 1))
+
+ if len(ts.data.shape) == 1:
+ dv = ts.data.view().reshape((ts.times.shape[0], -1))
+ else:
+ dv = ts.data
+
+ result = numpy.concatenate((tv, dv), axis=1)
+ self.storage.put_array(csv_path, result, header, header2=ts.histo_bins, append_on_exists=False)
+
+ def iter_ts(self, **ds_parts) -> Iterator[DataSource]:
+ return self.iter_objs(self.db_paths.ts_r, **ds_parts)
\ No newline at end of file
diff --git a/wally/run_test.py b/wally/run_test.py
index 555cfa1..4e02a75 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -13,9 +13,7 @@
from .sensors import collect_sensors_data
from .suits.all_suits import all_suits
from .test_run_class import TestRun
-from .utils import StopTestError
from .result_classes import SuiteConfig
-from .hlstorage import ResultStorage
logger = logging.getLogger("wally")
@@ -76,17 +74,22 @@
t_end = time.time()
for node, val in zip(ctx.nodes, tms):
- max_delta = int(max(t_start - val, val - t_end) * 1000)
- if max_delta > ctx.config.max_time_diff_ms:
+ delta = 0
+ if val > t_end:
+ delta = val - t_end
+ elif t_start > val:
+ delta = t_start - val
+
+ if delta > ctx.config.max_time_diff_ms:
msg = ("Too large time shift {}ms on node {}. Stopping test." +
" Fix time on cluster nodes and restart test, or change " +
- "max_time_diff_ms(={}ms) setting in config").format(max_delta,
+ "max_time_diff_ms(={}ms) setting in config").format(delta,
str(node),
ctx.config.max_time_diff_ms)
logger.error(msg)
- raise StopTestError(msg)
- if max_delta > 0:
- logger.warning("Node %s has time shift at least %s ms", node, max_delta)
+ raise utils.StopTestError(msg)
+ if delta > 0:
+ logger.warning("Node %s has time shift at least %s ms", node, delta)
def cleanup(self, ctx: TestRun) -> None:
@@ -246,11 +249,11 @@
if not test_nodes:
logger.error("No test nodes found")
- raise StopTestError()
+ raise utils.StopTestError()
if len(test_suite) != 1:
logger.error("Test suite %s contain more than one test. Put each test in separated group", suite_idx)
- raise StopTestError()
+ raise utils.StopTestError()
name, params = list(test_suite.items())[0]
vm_count = params.get('node_limit', None) # type: Optional[int]
@@ -267,7 +270,7 @@
if name not in all_suits:
logger.error("Test suite %r not found. Only suits [%s] available", name, ", ".join(all_suits))
- raise StopTestError()
+ raise utils.StopTestError()
test_cls = all_suits[name]
remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
@@ -279,7 +282,7 @@
idx=suite_idx,
keep_raw_files=ctx.config.keep_raw_files)
- test_cls(storage=ResultStorage(ctx.storage),
+ test_cls(storage=ctx.rstorage,
suite=suite,
on_idle=lambda: collect_sensors_data(ctx, False)).run()
@@ -312,7 +315,7 @@
if ctx.nodes_info:
logger.error("Internal error: Some nodes already stored in " +
"nodes_info before LoadStoredNodesStage stage")
- raise StopTestError()
+ raise utils.StopTestError()
nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
diff --git a/wally/sensors.py b/wally/sensors.py
index 89bc224..9fb2177 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -6,12 +6,12 @@
import numpy
from cephlib import sensors_rpc_plugin
+from cephlib.units import b2ssize
from . import utils
from .test_run_class import TestRun
from .result_classes import DataSource
from .stage import Stage, StepOrder
-from .hlstorage import ResultStorage
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
@@ -86,7 +86,6 @@
def collect_sensors_data(ctx: TestRun, stop: bool = False):
- rstorage = ResultStorage(ctx.storage)
total_sz = 0
logger.info("Start loading sensors")
@@ -105,19 +104,19 @@
for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
if path == 'collected_at':
ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
- rstorage.append_sensor(numpy.array(value), ds, units)
+ ctx.rstorage.append_sensor(numpy.array(value), ds, units)
else:
sensor, dev, metric = path.split(".")
ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
if is_array:
- rstorage.append_sensor(numpy.array(value), ds, units)
+ ctx.rstorage.append_sensor(numpy.array(value), ds, units)
else:
if metric == 'historic':
- rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
+ ctx.rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
else:
assert metric in ('perf_dump', 'historic_js')
- rstorage.put_sensor_raw(value, ds(tag='js'))
- logger.info("Download %sB of sensors data", utils.b2ssize(total_sz))
+ ctx.rstorage.put_sensor_raw(value, ds(tag='js'))
+ logger.info("Download %sB of sensors data", b2ssize(total_sz))
diff --git a/wally/ssh.py b/wally/ssh.py
index fcb7fb3..ddf3e58 100644
--- a/wally/ssh.py
+++ b/wally/ssh.py
@@ -9,7 +9,8 @@
import paramiko
-from . import utils
+from cephlib.common import Timeout
+
from .common_types import ConnCreds, IPAddr
logger = logging.getLogger("wally")
@@ -108,7 +109,7 @@
addrs_set = set(addrs) # type: Set[IPAddr]
- for _ in utils.Timeout(timeout):
+ for _ in Timeout(timeout):
selector = selectors.DefaultSelector() # type: selectors.BaseSelector
with selector:
for addr in addrs_set:
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index ed857b6..56d317f 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -4,7 +4,7 @@
from typing import List, Dict
-from . import utils
+from cephlib.common import to_ip
from .common_types import ConnCreds
@@ -60,7 +60,7 @@
if rrm is not None:
params = {"user": getpass.getuser()} # type: Dict[str, str]
params.update(rrm.groupdict())
- params['host'] = utils.to_ip(params['host'])
+ params['host'] = to_ip(params['host'])
return ConnCreds(**params) # type: ignore
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
diff --git a/wally/statistic.py b/wally/statistic.py
deleted file mode 100644
index 2b1e83a..0000000
--- a/wally/statistic.py
+++ /dev/null
@@ -1,350 +0,0 @@
-import math
-import logging
-import itertools
-from typing import List, Callable, Iterable, cast, Tuple
-
-import numpy
-from scipy import stats, optimize
-from numpy import linalg
-from numpy.polynomial.chebyshev import chebfit, chebval
-
-
-from .result_classes import NormStatProps, HistoStatProps, TimeSeries
-from .utils import Number
-
-
-logger = logging.getLogger("wally")
-DOUBLE_DELTA = 1e-8
-MIN_VALUES_FOR_CONFIDENCE = 7
-
-
-average = numpy.mean
-dev = lambda x: math.sqrt(numpy.var(x, ddof=1))
-
-
-def calc_norm_stat_props(ts: TimeSeries, bins_count: int = None, confidence: float = 0.95) -> NormStatProps:
- "Calculate statistical properties of array of numbers"
-
- res = NormStatProps(ts.data, ts.units) # type: ignore
-
- if len(ts.data) == 0:
- raise ValueError("Input array is empty")
-
- res.average = average(ts.data)
- res.deviation = dev(ts.data)
-
- data = sorted(ts.data)
- res.max = data[-1]
- res.min = data[0]
- pcs = numpy.percentile(data, q=[1.0, 5.0, 10., 50., 90., 95., 99.])
- res.perc_1, res.perc_5, res.perc_10, res.perc_50, res.perc_90, res.perc_95, res.perc_99 = pcs
-
- if len(data) >= MIN_VALUES_FOR_CONFIDENCE:
- res.confidence = stats.sem(ts.data) * \
- stats.t.ppf((1 + confidence) / 2, len(ts.data) - 1)
- res.confidence_level = confidence
- else:
- res.confidence = None
- res.confidence_level = None
-
- if bins_count is not None:
- res.bins_populations, res.bins_edges = numpy.histogram(ts.data, bins=bins_count)
- res.bins_edges = res.bins_edges[:-1]
-
- try:
- res.normtest = stats.mstats.normaltest(ts.data)
- except Exception as exc:
- logger.warning("stats.mstats.normaltest failed with error: %s", exc)
-
- res.skew = stats.skew(ts.data)
- res.kurt = stats.kurtosis(ts.data)
-
- return res
-
-
-# update this code
-def rebin_histogram(bins_populations: numpy.array,
- bins_edges: numpy.array,
- new_bins_count: int,
- left_tail_idx: int = None,
- right_tail_idx: int = None,
- log_bins: bool = False) -> Tuple[numpy.array, numpy.array]:
- # rebin large histogram into smaller with new_bins bins, linearly distributes across
- # left_tail_idx:right_tail_idx range
-
- assert len(bins_populations.shape) == 1
- assert len(bins_edges.shape) == 1
- assert bins_edges.shape[0] == bins_populations.shape[0]
-
- if left_tail_idx is None:
- min_val = bins_edges[0]
- else:
- min_val = bins_edges[left_tail_idx]
-
- if right_tail_idx is None:
- max_val = bins_edges[-1]
- else:
- max_val = bins_edges[right_tail_idx]
-
- if log_bins:
- assert min_val > 1E-3
- step = (max_val / min_val) ** (1 / new_bins_count)
- new_bins_edges = min_val * (step ** numpy.arange(new_bins_count)) # type: numpy.array
- else:
- new_bins_edges = numpy.linspace(min_val, max_val, new_bins_count + 1, dtype='float')[:-1] # type: numpy.array
-
- old_bins_pos = numpy.searchsorted(new_bins_edges, bins_edges, side='right')
- new_bins = numpy.zeros(new_bins_count, dtype=int) # type: numpy.array
-
- # last source bin can't be split
- # TODO: need to add assert for this
- new_bins[-1] += bins_populations[-1]
- bin_sizes = bins_edges[1:] - bins_edges[:-1]
-
- # correct position to get bin idx from edge idx
- old_bins_pos -= 1
- old_bins_pos[old_bins_pos < 0] = 0
- new_bins_sizes = new_bins_edges[1:] - new_bins_edges[:-1]
-
- for population, begin, end, bsize in zip(bins_populations[:-1], old_bins_pos[:-1], old_bins_pos[1:], bin_sizes):
- if begin == end:
- new_bins[begin] += population
- else:
- density = population / bsize
- for curr_box in range(begin, end):
- cnt = min(int(new_bins_sizes[begin] * density + 0.5), population)
- new_bins[begin] += cnt
- population -= cnt
-
- return new_bins, new_bins_edges
-
-
-def calc_histo_stat_props(ts: TimeSeries,
- bins_edges: numpy.array = None,
- rebins_count: int = None,
- tail: float = 0.005) -> HistoStatProps:
- if bins_edges is None:
- bins_edges = ts.histo_bins
-
- res = HistoStatProps(ts.data, ts.units)
-
- # summ across all series
- aggregated = ts.data.sum(axis=0, dtype='int')
- total = aggregated.sum()
-
- # percentiles levels
- expected = list(numpy.array([0.01, 0.05, 0.1, 0.5, 0.9, 0.95, 0.99]) * total)
- cumsum = numpy.cumsum(aggregated)
-
- percentiles_bins = numpy.searchsorted(cumsum, expected)
- percentiles = bins_edges[percentiles_bins]
- res.perc_1, res.perc_5, res.perc_10, res.perc_50, res.perc_90, res.perc_95, res.perc_99 = percentiles
-
- # don't show tail ranges on histogram
- left_tail_idx, right_tail_idx = numpy.searchsorted(cumsum, [tail * total, (1 - tail) * total])
-
- # minimax and maximal non-zero elements
- non_zero = numpy.nonzero(aggregated)[0]
- res.min = bins_edges[aggregated[non_zero[0]]]
- res.max = bins_edges[non_zero[-1] + (1 if non_zero[-1] != len(bins_edges) else 0)]
-
- res.log_bins = False
- if rebins_count is not None:
- res.bins_populations, res.bins_edges = rebin_histogram(aggregated, bins_edges, rebins_count,
- left_tail_idx, right_tail_idx)
- else:
- res.bins_populations = aggregated
- res.bins_edges = bins_edges.copy()
-
- return res
-
-
-def groupby_globally(data: Iterable, key_func: Callable):
- grouped = {} # type: ignore
- grouped_iter = itertools.groupby(data, key_func)
-
- for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
- key = (bs, cache_tp, act, conc)
- grouped.setdefault(key, []).extend(curr_data_it)
-
- return grouped
-
-
-def approximate_curve(x: List[Number], y: List[float], xnew: List[Number], curved_coef: int) -> List[float]:
- """returns ynew - y values of some curve approximation"""
- return cast(List[float], chebval(xnew, chebfit(x, y, curved_coef)))
-
-
-def approximate_line(x: List[Number], y: List[float], xnew: List[Number], relative_dist: bool = False) -> List[float]:
- """
- x, y - test data, xnew - dots, where we want find approximation
- if not relative_dist distance = y - newy
- returns ynew - y values of linear approximation
- """
- ox = numpy.array(x)
- oy = numpy.array(y)
-
- # set approximation function
- def func_line(tpl, x):
- return tpl[0] * x + tpl[1]
-
- def error_func_rel(tpl, x, y):
- return 1.0 - y / func_line(tpl, x)
-
- def error_func_abs(tpl, x, y):
- return y - func_line(tpl, x)
-
- # choose distance mode
- error_func = error_func_rel if relative_dist else error_func_abs
-
- tpl_initial = tuple(linalg.solve([[ox[0], 1.0], [ox[1], 1.0]],
- oy[:2]))
-
- # find line
- tpl_final, success = optimize.leastsq(error_func, tpl_initial[:], args=(ox, oy))
-
- # if error
- if success not in range(1, 5):
- raise ValueError("No line for this dots")
-
- # return new dots
- return func_line(tpl_final, numpy.array(xnew))
-
-
-def moving_average(data: numpy.array, window: int) -> numpy.array:
- cumsum = numpy.cumsum(data)
- cumsum[window:] = cumsum[window:] - cumsum[:-window]
- return cumsum[window - 1:] / window
-
-
-def moving_dev(data: numpy.array, window: int) -> numpy.array:
- cumsum = numpy.cumsum(data)
- cumsum2 = numpy.cumsum(data ** 2)
- cumsum[window:] = cumsum[window:] - cumsum[:-window]
- cumsum2[window:] = cumsum2[window:] - cumsum2[:-window]
- return ((cumsum2[window - 1:] - cumsum[window - 1:] ** 2 / window) / (window - 1)) ** 0.5
-
-
-def find_ouliers(data: numpy.array,
- center_range: Tuple[int, int] = (25, 75),
- cut_range: float = 3.0) -> numpy.array:
- v1, v2 = numpy.percentile(data, center_range)
- return numpy.abs(data - (v1 + v2) / 2) > ((v2 - v1) / 2 * cut_range)
-
-
-def find_ouliers_ts(data: numpy.array,
- windows_size: int = 30,
- center_range: Tuple[int, int] = (25, 75),
- cut_range: float = 3.0) -> numpy.array:
- outliers = numpy.empty(data.shape, dtype=bool)
-
- if len(data) < windows_size:
- outliers[:] = False
- return outliers
-
- begin_idx = 0
- if len(data) < windows_size * 2:
- end_idx = (len(data) % windows_size) // 2 + windows_size
- else:
- end_idx = len(data)
-
- while True:
- cdata = data[begin_idx: end_idx]
- outliers[begin_idx: end_idx] = find_ouliers(cdata, center_range, cut_range)
- begin_idx = end_idx
-
- if end_idx == len(data):
- break
-
- end_idx += windows_size
- if len(data) - end_idx < windows_size:
- end_idx = len(data)
-
- return outliers
-
-
-def hist_outliers_nd(bin_populations: numpy.array,
- bin_centers: numpy.array,
- center_range: Tuple[int, int] = (25, 75),
- cut_range: float = 3.0) -> Tuple[int, int]:
- assert len(bin_populations) == len(bin_centers)
- total_count = bin_populations.sum()
-
- perc25 = total_count / 100.0 * center_range[0]
- perc75 = total_count / 100.0 * center_range[1]
-
- perc25_idx, perc75_idx = numpy.searchsorted(numpy.cumsum(bin_populations), [perc25, perc75])
- middle = (bin_centers[perc75_idx] + bin_centers[perc25_idx]) / 2
- r = (bin_centers[perc75_idx] - bin_centers[perc25_idx]) / 2
-
- lower_bound = middle - r * cut_range
- upper_bound = middle + r * cut_range
-
- lower_cut_idx, upper_cut_idx = numpy.searchsorted(bin_centers, [lower_bound, upper_bound])
- return lower_cut_idx, upper_cut_idx
-
-
-def hist_outliers_perc(bin_populations: numpy.array,
- bounds_perc: Tuple[float, float] = (0.01, 0.99),
- min_bins_left: int = None) -> Tuple[int, int]:
- assert len(bin_populations.shape) == 1
- total_count = bin_populations.sum()
- lower_perc = total_count * bounds_perc[0]
- upper_perc = total_count * bounds_perc[1]
- idx1, idx2 = numpy.searchsorted(numpy.cumsum(bin_populations), [lower_perc, upper_perc])
-
- # don't cut too many bins. At least min_bins_left must left
- if min_bins_left is not None and idx2 - idx1 < min_bins_left:
- missed = min_bins_left - (idx2 - idx1) // 2
- idx2 = min(len(bin_populations), idx2 + missed)
- idx1 = max(0, idx1 - missed)
-
- return idx1, idx2
-
-
-def ts_hist_outliers_perc(bin_populations: numpy.array,
- window_size: int = 10,
- bounds_perc: Tuple[float, float] = (0.01, 0.99),
- min_bins_left: int = None) -> Tuple[int, int]:
- assert len(bin_populations.shape) == 2
-
- points = list(range(0, len(bin_populations), window_size))
- if len(bin_populations) % window_size != 0:
- points.append(points[-1] + window_size)
-
- ranges = [] # type: List[List[int]]
- for begin, end in zip(points[:-1], points[1:]):
- window_hist = bin_populations[begin:end].sum(axis=0)
- ranges.append(hist_outliers_perc(window_hist, bounds_perc=bounds_perc, min_bins_left=min_bins_left))
-
- return min(i[0] for i in ranges), max(i[1] for i in ranges)
-
-
-# TODO: revise next
-# def difference(y, ynew):
-# """returns average and maximum relative and
-# absolute differences between y and ynew
-# result may contain None values for y = 0
-# return value - tuple:
-# [(abs dif, rel dif) * len(y)],
-# (abs average, abs max),
-# (rel average, rel max)"""
-#
-# abs_dlist = []
-# rel_dlist = []
-#
-# for y1, y2 in zip(y, ynew):
-# # absolute
-# abs_dlist.append(y1 - y2)
-#
-# if y1 > 1E-6:
-# rel_dlist.append(abs(abs_dlist[-1] / y1))
-# else:
-# raise ZeroDivisionError("{0!r} is too small".format(y1))
-#
-# da_avg = sum(abs_dlist) / len(abs_dlist)
-# dr_avg = sum(rel_dlist) / len(rel_dlist)
-#
-# return (zip(abs_dlist, rel_dlist),
-# (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
-# )
diff --git a/wally/storage.py b/wally/storage.py
deleted file mode 100644
index e07d9b3..0000000
--- a/wally/storage.py
+++ /dev/null
@@ -1,349 +0,0 @@
-"""
-This module contains interfaces for storage classes
-"""
-
-import os
-import re
-import abc
-import shutil
-import logging
-from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
-
-import yaml
-try:
- from yaml import CLoader as Loader, CDumper as Dumper # type: ignore
-except ImportError:
- from yaml import Loader, Dumper # type: ignore
-
-from .common_types import IStorable
-
-
-logger = logging.getLogger("wally")
-
-
-class ISimpleStorage(metaclass=abc.ABCMeta):
- """interface for low-level storage, which doesn't support serialization
- and can operate only on bytes"""
-
- @abc.abstractmethod
- def put(self, value: bytes, path: str) -> None:
- pass
-
- @abc.abstractmethod
- def get(self, path: str) -> bytes:
- pass
-
- @abc.abstractmethod
- def rm(self, path: str) -> None:
- pass
-
- @abc.abstractmethod
- def sync(self) -> None:
- pass
-
- @abc.abstractmethod
- def __contains__(self, path: str) -> bool:
- pass
-
- @abc.abstractmethod
- def get_fd(self, path: str, mode: str = "rb+") -> IO:
- pass
-
- @abc.abstractmethod
- def get_fname(self, path: str) -> str:
- pass
-
- @abc.abstractmethod
- def sub_storage(self, path: str) -> 'ISimpleStorage':
- pass
-
- @abc.abstractmethod
- def list(self, path: str) -> Iterator[Tuple[bool, str]]:
- pass
-
-
-class ITSStorage(metaclass=abc.ABCMeta):
- """interface for low-level storage, which doesn't support serialization
- and can operate only on bytes"""
-
- @abc.abstractmethod
- def put(self, value: bytes, path: str) -> None:
- pass
-
- @abc.abstractmethod
- def get(self, path: str) -> bytes:
- pass
-
- @abc.abstractmethod
- def rm(self, path: str) -> None:
- pass
-
- @abc.abstractmethod
- def sync(self) -> None:
- pass
-
- @abc.abstractmethod
- def __contains__(self, path: str) -> bool:
- pass
-
- @abc.abstractmethod
- def get_fd(self, path: str, mode: str = "rb+") -> IO:
- pass
-
- @abc.abstractmethod
- def sub_storage(self, path: str) -> 'ISimpleStorage':
- pass
-
- @abc.abstractmethod
- def list(self, path: str) -> Iterator[Tuple[bool, str]]:
- pass
-
-
-class ISerializer(metaclass=abc.ABCMeta):
- """Interface for serialization class"""
- @abc.abstractmethod
- def pack(self, value: IStorable) -> bytes:
- pass
-
- @abc.abstractmethod
- def unpack(self, data: bytes) -> Any:
- pass
-
-
-class FSStorage(ISimpleStorage):
- """Store all data in files on FS"""
-
- def __init__(self, root_path: str, existing: bool) -> None:
- self.root_path = root_path
- self.existing = existing
- self.ignored = {'.', '..'}
-
- def j(self, path: str) -> str:
- return os.path.join(self.root_path, path)
-
- def put(self, value: bytes, path: str) -> None:
- jpath = self.j(path)
- os.makedirs(os.path.dirname(jpath), exist_ok=True)
- with open(jpath, "wb") as fd:
- fd.write(value)
-
- def get(self, path: str) -> bytes:
- try:
- with open(self.j(path), "rb") as fd:
- return fd.read()
- except FileNotFoundError as exc:
- raise KeyError(path) from exc
-
- def rm(self, path: str) -> None:
- if os.path.isdir(path):
- shutil.rmtree(path, ignore_errors=True)
- elif os.path.exists(path):
- os.unlink(path)
-
- def __contains__(self, path: str) -> bool:
- return os.path.exists(self.j(path))
-
- def get_fname(self, path: str) -> str:
- return self.j(path)
-
- def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
- jpath = self.j(path)
-
- if "cb" == mode:
- create_on_fail = True
- mode = "rb+"
- os.makedirs(os.path.dirname(jpath), exist_ok=True)
- elif "ct" == mode:
- create_on_fail = True
- mode = "rt+"
- os.makedirs(os.path.dirname(jpath), exist_ok=True)
- else:
- create_on_fail = False
-
- try:
- fd = open(jpath, mode)
- except IOError:
- if not create_on_fail:
- raise
-
- if 't' in mode:
- fd = open(jpath, "wt")
- else:
- fd = open(jpath, "wb")
-
- return cast(IO[bytes], fd)
-
- def sub_storage(self, path: str) -> 'FSStorage':
- return self.__class__(self.j(path), self.existing)
-
- def sync(self):
- pass
-
- def list(self, path: str) -> Iterator[Tuple[bool, str]]:
- path = self.j(path)
-
- if not os.path.exists(path):
- return
-
- if not os.path.isdir(path):
- raise OSError("{!r} is not a directory".format(path))
-
- for fobj in os.scandir(path):
- if fobj.path not in self.ignored:
- if fobj.is_dir():
- yield False, fobj.name
- else:
- yield True, fobj.name
-
-
-class YAMLSerializer(ISerializer):
- """Serialize data to yaml"""
- def pack(self, value: IStorable) -> bytes:
- try:
- return yaml.dump(value, Dumper=Dumper, encoding="utf8")
- except Exception as exc:
- raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
-
- def unpack(self, data: bytes) -> Any:
- return yaml.load(data, Loader=Loader)
-
-
-class SAFEYAMLSerializer(ISerializer):
- """Serialize data to yaml"""
- def pack(self, value: IStorable) -> bytes:
- try:
- return yaml.safe_dump(value, encoding="utf8")
- except Exception as exc:
- raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
-
- def unpack(self, data: bytes) -> Any:
- return yaml.safe_load(data)
-
-
-ObjClass = TypeVar('ObjClass', bound=IStorable)
-
-
-class _Raise:
- pass
-
-
-class Storage:
- """interface for storage"""
-
- def __init__(self, sstorage: ISimpleStorage, serializer: ISerializer) -> None:
- self.sstorage = sstorage
- self.serializer = serializer
- self.cache = {}
-
- def sub_storage(self, *path: str) -> 'Storage':
- fpath = "/".join(path)
- return self.__class__(self.sstorage.sub_storage(fpath), self.serializer)
-
- def put(self, value: Any, *path: str) -> None:
- dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
- serialized = self.serializer.pack(dct_value) # type: ignore
- fpath = "/".join(path)
- self.sstorage.put(serialized, fpath)
-
- def put_list(self, value: Iterable[IStorable], *path: str) -> None:
- serialized = self.serializer.pack([obj.raw() for obj in value]) # type: ignore
- fpath = "/".join(path)
- self.sstorage.put(serialized, fpath)
-
- def get(self, path: str, default: Any = _Raise) -> Any:
- try:
- vl = self.sstorage.get(path)
- except:
- if default is _Raise:
- raise
- return default
-
- return self.serializer.unpack(vl)
-
- def rm(self, *path: str) -> None:
- fpath = "/".join(path)
- self.sstorage.rm(fpath)
-
- def __contains__(self, path: str) -> bool:
- return path in self.sstorage
-
- def put_raw(self, val: bytes, *path: str) -> str:
- fpath = "/".join(path)
- self.sstorage.put(val, fpath)
- # TODO: dirty hack
- return self.resolve_raw(fpath)
-
- def resolve_raw(self, fpath) -> str:
- return cast(FSStorage, self.sstorage).j(fpath)
-
- def get_raw(self, *path: str) -> bytes:
- return self.sstorage.get("/".join(path))
-
- def append_raw(self, value: bytes, *path: str) -> None:
- with self.sstorage.get_fd("/".join(path), "rb+") as fd:
- fd.seek(0, os.SEEK_END)
- fd.write(value)
-
- def get_fd(self, path: str, mode: str = "r") -> IO:
- return self.sstorage.get_fd(path, mode)
-
- def get_fname(self, path: str) -> str:
- return self.sstorage.get_fname(path)
-
- def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
- path_s = "/".join(path)
- if path_s not in self.cache:
- raw_val = cast(List[Dict[str, Any]], self.get(path_s))
- assert isinstance(raw_val, list)
- self.cache[path_s] = [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
- return self.cache[path_s]
-
- def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
- path_s = "/".join(path)
- if path_s not in self.cache:
- self.cache[path_s] = cast(ObjClass, obj_class.fromraw(self.get(path_s)))
- return self.cache[path_s]
-
- def sync(self) -> None:
- self.sstorage.sync()
-
- def __enter__(self) -> 'Storage':
- return self
-
- def __exit__(self, x: Any, y: Any, z: Any) -> None:
- self.sync()
-
- def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
- return self.sstorage.list("/".join(path))
-
- def _iter_paths(self,
- root: str,
- path_parts: List[str],
- groups: Dict[str, str]) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
-
- curr = path_parts[0]
- rest = path_parts[1:]
-
- for is_file, name in self.list(root):
- if rest and is_file:
- continue
-
- rr = re.match(pattern=curr + "$", string=name)
- if rr:
- if root:
- path = root + "/" + name
- else:
- path = name
-
- new_groups = rr.groupdict().copy()
- new_groups.update(groups)
-
- if rest:
- yield from self._iter_paths(path, rest, new_groups)
- else:
- yield is_file, path, new_groups
-
-
-def make_storage(url: str, existing: bool = False) -> Storage:
- return Storage(FSStorage(url, existing), SAFEYAMLSerializer())
-
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 033d771..9433361 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -6,7 +6,7 @@
LQDW={% 1, 4, 16, 64 %}
LQDR={% 1, 4, 16, 64 %}
-runtime=300
+runtime=600
direct=1
ramp_time=30
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 5cb27b5..5f3c764 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -2,7 +2,7 @@
from typing import Dict, Any, Tuple, cast, Union
from collections import OrderedDict
-from ..common_types import Storable
+from cephlib.storage import Storable
class JobParams(metaclass=abc.ABCMeta):
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index c4b5bc5..b67035e 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,19 +1,19 @@
from typing import List, Callable, Any, Dict, Optional, Set
from concurrent.futures import ThreadPoolExecutor
+from cephlib.istorage import IStorage
-from .timeseries import SensorDatastore
from .node_interfaces import NodeInfo, IRPCNode
from .openstack_api import OSCreds, OSConnection
-from .storage import Storage
from .config import Config
from .fuel_rest_api import Connection
from .ssh_utils import ConnCreds
+from .result_classes import IResultStorage
class TestRun:
"""Test run information"""
- def __init__(self, config: Config, storage: Storage) -> None:
+ def __init__(self, config: Config, storage: IStorage, rstorage: IResultStorage) -> None:
# NodesInfo list
self.nodes_info = {} # type: Dict[str, NodeInfo]
@@ -33,8 +33,8 @@
self.default_rpc_plugins = None # type: Dict[str, bytes]
self.storage = storage
+ self.rstorage = rstorage
self.config = config
- self.sensors_data = SensorDatastore()
self.sensors_run_on = set() # type: Set[str]
self.os_spawned_nodes_ids = None # type: List[int]
diff --git a/wally/texttable.py b/wally/texttable.py
deleted file mode 100644
index 504b04d..0000000
--- a/wally/texttable.py
+++ /dev/null
@@ -1,590 +0,0 @@
-# texttable - module for creating simple ASCII tables
-# Copyright (C) 2003-2015 Gerome Fournier <jef(at)foutaise.org>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
-
-"""module for creating simple ASCII tables
-
-
-Example:
-
- table = Texttable()
- table.set_cols_align(["l", "r", "c"])
- table.set_cols_valign(["t", "m", "b"])
- table.add_rows([["Name", "Age", "Nickname"],
- ["Mr\\nXavier\\nHuon", 32, "Xav'"],
- ["Mr\\nBaptiste\\nClement", 1, "Baby"],
- ["Mme\\nLouise\\nBourgeau", 28, "Lou\\n\\nLoue"]])
- print table.draw() + "\\n"
-
- table = Texttable()
- table.set_deco(Texttable.HEADER)
- table.set_cols_dtype(['t', # text
- 'f', # float (decimal)
- 'e', # float (exponent)
- 'i', # integer
- 'a']) # automatic
- table.set_cols_align(["l", "r", "r", "r", "l"])
- table.add_rows([["text", "float", "exp", "int", "auto"],
- ["abcd", "67", 654, 89, 128.001],
- ["efghijk", 67.5434, .654, 89.6, 12800000000000000000000.00023],
- ["lmn", 5e-78, 5e-78, 89.4, .000000000000128],
- ["opqrstu", .023, 5e+78, 92., 12800000000000000000000]])
- print table.draw()
-
-Result:
-
- +----------+-----+----------+
- | Name | Age | Nickname |
- +==========+=====+==========+
- | Mr | | |
- | Xavier | 32 | |
- | Huon | | Xav' |
- +----------+-----+----------+
- | Mr | | |
- | Baptiste | 1 | |
- | Clement | | Baby |
- +----------+-----+----------+
- | Mme | | Lou |
- | Louise | 28 | |
- | Bourgeau | | Loue |
- +----------+-----+----------+
-
- text float exp int auto
- ===========================================
- abcd 67.000 6.540e+02 89 128.001
- efgh 67.543 6.540e-01 90 1.280e+22
- ijkl 0.000 5.000e-78 89 0.000
- mnop 0.023 5.000e+78 92 1.280e+22
-"""
-
-from __future__ import division
-
-__all__ = ["Texttable", "ArraySizeError"]
-
-__author__ = 'Gerome Fournier <jef(at)foutaise.org>'
-__license__ = 'LGPL'
-__version__ = '0.8.8'
-__credits__ = """\
-Jeff Kowalczyk:
- - textwrap improved import
- - comment concerning header output
-
-Anonymous:
- - add_rows method, for adding rows in one go
-
-Sergey Simonenko:
- - redefined len() function to deal with non-ASCII characters
-
-Roger Lew:
- - columns datatype specifications
-
-Brian Peterson:
- - better handling of unicode errors
-
-Frank Sachsenheim:
- - add Python 2/3-compatibility
-
-Maximilian Hils:
- - fix minor bug for Python 3 compatibility
-
-frinkelpi:
- - preserve empty lines
-"""
-
-import sys
-import string
-import unicodedata
-
-try:
- if sys.version >= '2.3':
- import textwrap
- elif sys.version >= '2.2':
- from optparse import textwrap
- else:
- from optik import textwrap
-except ImportError:
- sys.stderr.write("Can't import textwrap module!\n")
- raise
-
-if sys.version >= '2.7':
- from functools import reduce
-
-if sys.version >= '3.0':
- unicode_type = str
- bytes_type = bytes
-else:
- unicode_type = unicode
- bytes_type = str
-
-
-def obj2unicode(obj):
- """Return a unicode representation of a python object
- """
- if isinstance(obj, unicode_type):
- return obj
- elif isinstance(obj, bytes_type):
- try:
- return unicode_type(obj, 'utf-8')
- except UnicodeDecodeError as strerror:
- sys.stderr.write("UnicodeDecodeError exception for string '%s': %s\n" % (obj, strerror))
- return unicode_type(obj, 'utf-8', 'replace')
- else:
- return unicode_type(obj)
-
-
-def len(iterable):
- """Redefining len here so it will be able to work with non-ASCII characters
- """
- if isinstance(iterable, bytes_type) or isinstance(iterable, unicode_type):
- unicode_data = obj2unicode(iterable)
- if hasattr(unicodedata, 'east_asian_width'):
- w = unicodedata.east_asian_width
- return sum([w(c) in 'WF' and 2 or 1 for c in unicode_data])
- else:
- return unicode_data.__len__()
- else:
- return iterable.__len__()
-
-
-class ArraySizeError(Exception):
- """Exception raised when specified rows don't fit the required size
- """
-
- def __init__(self, msg):
- self.msg = msg
- Exception.__init__(self, msg, '')
-
- def __str__(self):
- return self.msg
-
-
-class Texttable:
-
- BORDER = 1
- HEADER = 1 << 1
- HLINES = 1 << 2
- VLINES = 1 << 3
-
- def __init__(self, max_width=80):
- """Constructor
-
- - max_width is an integer, specifying the maximum width of the table
- - if set to 0, size is unlimited, therefore cells won't be wrapped
- """
-
- if max_width <= 0:
- max_width = False
- self._max_width = max_width
- self._precision = 3
-
- self._deco = Texttable.VLINES | Texttable.HLINES | Texttable.BORDER | Texttable.HEADER
- self._reset()
- ## left, horiz, cross, right
- self._chars_top = (chr(0x250c), chr(0x2500), chr(0x252c), chr(0x2510))
- # self.chars_header = (chr(0x255e), chr(0x2550), chr(0x256a), chr(0x2561))
- self._chars_header = (chr(0x251d), chr(0x2501), chr(0x253f), chr(0x2525))
- self._chars_middle = (chr(0x251c), chr(0x2500), chr(0x253c), chr(0x2524))
- self._chars_bottom = (chr(0x2514), chr(0x2500), chr(0x2534), chr(0x2518))
- self._char_vert = chr(0x2502)
- self._align = None
-
- def _reset(self):
- """Reset the instance
-
- - reset rows and header
- """
-
- self._row_size = None
- self._header = []
- self._rows = []
-
- def set_cols_align(self, array):
- """Set the desired columns alignment
-
- - the elements of the array should be either "l", "c" or "r":
-
- * "l": column flushed left
- * "c": column centered
- * "r": column flushed right
- """
-
- self._check_row_size(array)
- self._align = array
-
- def set_cols_valign(self, array):
- """Set the desired columns vertical alignment
-
- - the elements of the array should be either "t", "m" or "b":
-
- * "t": column aligned on the top of the cell
- * "m": column aligned on the middle of the cell
- * "b": column aligned on the bottom of the cell
- """
-
- self._check_row_size(array)
- self._valign = array
-
- def set_cols_dtype(self, array):
- """Set the desired columns datatype for the cols.
-
- - the elements of the array should be either "a", "t", "f", "e" or "i":
-
- * "a": automatic (try to use the most appropriate datatype)
- * "t": treat as text
- * "f": treat as float in decimal format
- * "e": treat as float in exponential format
- * "i": treat as int
-
- - by default, automatic datatyping is used for each column
- """
-
- self._check_row_size(array)
- self._dtype = array
-
- def set_cols_width(self, array):
- """Set the desired columns width
-
- - the elements of the array should be integers, specifying the
- width of each column. For example:
-
- [10, 20, 5]
- """
-
- self._check_row_size(array)
- try:
- array = list(map(int, array))
- if reduce(min, array) <= 0:
- raise ValueError
- except ValueError:
- sys.stderr.write("Wrong argument in column width specification\n")
- raise
- self._width = array
-
- def set_precision(self, width):
- """Set the desired precision for float/exponential formats
-
- - width must be an integer >= 0
-
- - default value is set to 3
- """
-
- if not type(width) is int or width < 0:
- raise ValueError('width must be an integer greater then 0')
- self._precision = width
-
- def header(self, array):
- """Specify the header of the table
- """
-
- self._check_row_size(array)
- self._header = list(map(obj2unicode, array))
-
- def add_row(self, array):
- """Add a row in the rows stack
-
- - cells can contain newlines and tabs
- """
-
- self._check_row_size(array)
-
- if not hasattr(self, "_dtype"):
- self._dtype = ["a"] * self._row_size
-
- cells = []
- for i, x in enumerate(array):
- cells.append(self._str(i, x))
-
- self._rows.append(cells)
-
- def add_rows(self, rows, header=True):
- """Add several rows in the rows stack
-
- - The 'rows' argument can be either an iterator returning arrays,
- or a by-dimensional array
- - 'header' specifies if the first row should be used as the header
- of the table
- """
-
- # nb: don't use 'iter' on by-dimensional arrays, to get a
- # usable code for python 2.1
- if header:
- if hasattr(rows, '__iter__') and hasattr(rows, 'next'):
- self.header(rows.next())
- else:
- self.header(rows[0])
- rows = rows[1:]
- for row in rows:
- self.add_row(row)
-
- def draw(self):
- """Draw the table
-
- - the table is returned as a whole string
- """
-
- if not self._header and not self._rows:
- return
- self._compute_cols_width()
- self._check_align()
- out = ""
-
- if self._has_border():
- out += self._hline(*self._chars_top)
-
- if self._header:
- out += self._draw_line(self._header, isheader=True)
- if self._has_header():
- out += self._hline(*self._chars_header)
-
- length = 0
- for row in self._rows:
- length += 1
- out += self._draw_line(row)
- if self._has_hlines() and length < len(self._rows):
- out += self._hline(*self._chars_middle)
-
- if self._has_border():
- out += self._hline(*self._chars_bottom)
-
- return out[:-1]
-
- def _str(self, i, x):
- """Handles string formatting of cell data
-
- i - index of the cell datatype in self._dtype
- x - cell data to format
- """
- if isinstance(x, str):
- return x
-
- try:
- f = float(x)
- except:
- return obj2unicode(x)
-
- n = self._precision
- dtype = self._dtype[i]
-
- if dtype == 'i':
- return str(int(round(f)))
- elif dtype == 'f':
- return '%.*f' % (n, f)
- elif dtype == 'e':
- return '%.*e' % (n, f)
- elif dtype == 't':
- return obj2unicode(x)
- else:
- if f - round(f) == 0:
- if abs(f) > 1e8:
- return '%.*e' % (n, f)
- else:
- return str(int(round(f)))
- else:
- if abs(f) > 1e8:
- return '%.*e' % (n, f)
- else:
- return '%.*f' % (n, f)
-
- def _check_row_size(self, array):
- """Check that the specified array fits the previous rows size
- """
-
- if not self._row_size:
- self._row_size = len(array)
- elif self._row_size != len(array):
- raise ArraySizeError("array should contain %d elements" \
- % self._row_size)
-
- def _has_vlines(self):
- """Return a boolean, if vlines are required or not
- """
-
- return self._deco & Texttable.VLINES > 0
-
- def _has_hlines(self):
- """Return a boolean, if hlines are required or not
- """
-
- return self._deco & Texttable.HLINES > 0
-
- def _has_border(self):
- """Return a boolean, if border is required or not
- """
-
- return self._deco & Texttable.BORDER > 0
-
- def _has_header(self):
- """Return a boolean, if header line is required or not
- """
-
- return self._deco & Texttable.HEADER > 0
-
- def _hline(self, left, horiz, cross, right):
- """Return a string used to separated rows or separate header from rows"""
-
- # compute cell separator
- sep = horiz + (cross if self._has_vlines() else horiz) + horiz
-
- # build the line
- line = sep.join([horiz * n for n in self._width])
-
- # add border if needed
- if self._has_border():
- line = left + horiz + line + horiz + right
-
- return line + "\n"
-
- def _len_cell(self, cell):
- """Return the width of the cell
-
- Special characters are taken into account to return the width of the
- cell, such like newlines and tabs
- """
-
- cell_lines = cell.split('\n')
- maxi = 0
- for line in cell_lines:
- length = 0
- parts = line.split('\t')
- for part, i in zip(parts, list(range(1, len(parts) + 1))):
- length = length + len(part)
- if i < len(parts):
- length = (length//8 + 1) * 8
- maxi = max(maxi, length)
- return maxi
-
- def _compute_cols_width(self):
- """Return an array with the width of each column
-
- If a specific width has been specified, exit. If the total of the
- columns width exceed the table desired width, another width will be
- computed to fit, and cells will be wrapped.
- """
-
- if hasattr(self, "_width"):
- return
- maxi = []
- if self._header:
- maxi = [ self._len_cell(x) for x in self._header ]
- for row in self._rows:
- for cell,i in zip(row, list(range(len(row)))):
- try:
- maxi[i] = max(maxi[i], self._len_cell(cell))
- except (TypeError, IndexError):
- maxi.append(self._len_cell(cell))
- items = len(maxi)
- length = sum(maxi)
- if self._max_width and length + items * 3 + 1 > self._max_width:
- maxi = [
- int(round(self._max_width / (length + items * 3 + 1) * n))
- for n in maxi
- ]
- self._width = maxi
-
- def _check_align(self):
- """Check if alignment has been specified, set default one if not
- """
-
- if not hasattr(self, "_align"):
- self._align = ["l"] * self._row_size
- if not hasattr(self, "_valign"):
- self._valign = ["t"] * self._row_size
-
- def _draw_line(self, line, isheader=False):
- """Draw a line
-
- Loop over a single cell length, over all the cells
- """
-
- line = self._splitit(line, isheader)
- space = " "
- out = ""
- for i in range(len(line[0])):
- if self._has_border():
- out += "%s " % self._char_vert
- length = 0
- for cell, width, align in zip(line, self._width, self._align):
- length += 1
- cell_line = cell[i]
- fill = width - len(cell_line)
- if isheader:
- align = "c"
- if align == "r":
- out += "%s " % (fill * space + cell_line)
- elif align == "c":
- out += "%s " % (int(fill/2) * space + cell_line + int(fill/2 + fill%2) * space)
- else:
- out += "%s " % (cell_line + fill * space)
- if length < len(line):
- out += "%s " % [space, self._char_vert][self._has_vlines()]
- out += "%s\n" % ['', self._char_vert][self._has_border()]
- return out
-
- def _splitit(self, line, isheader):
- """Split each element of line to fit the column width
-
- Each element is turned into a list, result of the wrapping of the
- string to the desired width
- """
-
- line_wrapped = []
- for cell, width in zip(line, self._width):
- array = []
- for c in cell.split('\n'):
- if c.strip() == "":
- array.append("")
- else:
- array.extend(textwrap.wrap(c, width))
- line_wrapped.append(array)
- max_cell_lines = reduce(max, list(map(len, line_wrapped)))
- for cell, valign in zip(line_wrapped, self._valign):
- if isheader:
- valign = "t"
- if valign == "m":
- missing = max_cell_lines - len(cell)
- cell[:0] = [""] * int(missing / 2)
- cell.extend([""] * int(missing / 2 + missing % 2))
- elif valign == "b":
- cell[:0] = [""] * (max_cell_lines - len(cell))
- else:
- cell.extend([""] * (max_cell_lines - len(cell)))
- return line_wrapped
-
-
-if __name__ == '__main__':
- table = Texttable()
- table.set_cols_align(["l", "r", "c"])
- table.set_cols_valign(["t", "m", "b"])
- table.add_rows([["Name", "Age", "Nickname"],
- ["Mr\nXavier\nHuon", 32, "Xav'"],
- ["Mr\nBaptiste\nClement", 1, "Baby"],
- ["Mme\nLouise\nBourgeau", 28, "Lou\n \nLoue"]])
- print(table.draw() + "\n")
-
- table = Texttable()
- table.set_deco(Texttable.HEADER)
- table.set_cols_dtype(['t', # text
- 'f', # float (decimal)
- 'e', # float (exponent)
- 'i', # integer
- 'a']) # automatic
- table.set_cols_align(["l", "r", "r", "r", "l"])
- table.add_rows([["text", "float", "exp", "int", "auto"],
- ["abcd", "67", 654, 89, 128.001],
- ["efghijk", 67.5434, .654, 89.6, 12800000000000000000000.00023],
- ["lmn", 5e-78, 5e-78, 89.4, .000000000000128],
- ["opqrstu", .023, 5e+78, 92., 12800000000000000000000]])
- print(table.draw())
diff --git a/wally/timeseries.py b/wally/timeseries.py
deleted file mode 100644
index d322fff..0000000
--- a/wally/timeseries.py
+++ /dev/null
@@ -1,61 +0,0 @@
-import array
-import threading
-
-
-class SensorDatastore(object):
- def __init__(self, stime=None):
- self.lock = threading.Lock()
- self.stime = stime
-
- self.min_size = 60 * 60
- self.max_size = 60 * 61
-
- self.data = {
- 'testnodes:io': array.array("B"),
- 'testnodes:cpu': array.array("B"),
- }
-
- def get_values(self, name, start, end):
- assert end >= start
-
- if end == start:
- return []
-
- with self.lock:
- curr_arr = self.data[name]
- if self.stime is None:
- return []
-
- sidx = start - self.stime
- eidx = end - self.stime
-
- if sidx < 0 and eidx < 0:
- return [0] * (end - start)
- elif sidx < 0:
- return [0] * (-sidx) + curr_arr[:eidx]
- return curr_arr[sidx:eidx]
-
- def update_values(self, data_time, vals, add=False):
- with self.lock:
- if self.stime is None:
- self.stime = data_time
-
- for name, value in vals.items():
- curr_arr = self.data.setdefault(name, array.array("H"))
- curr_end_time = len(curr_arr) + self.stime
-
- dtime = data_time - curr_end_time
-
- if dtime > 0:
- curr_arr.extend([0] * dtime)
- curr_arr.append(value)
- elif dtime == 0:
- curr_arr.append(value)
- else:
- # dtime < 0
- sindex = len(curr_arr) + dtime
- if sindex > 0:
- if add:
- curr_arr[sindex] += value
- else:
- curr_arr[sindex].append(value)
diff --git a/wally/types.py b/wally/types.py
deleted file mode 100644
index 30db83f..0000000
--- a/wally/types.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from typing import TypeVar, List, Union
-
-import numpy
-
-
-TNumber = TypeVar('TNumber', int, float)
-Number = Union[int, float]
-NumVector = Union[numpy.ndarray, List[int], List[float]]
diff --git a/wally/utils.py b/wally/utils.py
index a0f10e8..dfd0e8c 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,33 +1,19 @@
-import re
import os
import sys
-import math
-import time
import uuid
-import socket
import logging
import datetime
-import ipaddress
-import threading
import contextlib
-import subprocess
-from fractions import Fraction
-from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
-
-try:
- import psutil
-except ImportError:
- psutil = None
+from typing import Any, Tuple, Iterator, Iterable
try:
from petname import Generate as pet_generate
except ImportError:
- def pet_generate(x: str, y: str) -> str:
+ def pet_generate(_1: str, _2: str) -> str:
return str(uuid.uuid4())
-
-from .types import TNumber, Number
+from cephlib.common import run_locally, sec_to_str
logger = logging.getLogger("wally")
@@ -65,63 +51,6 @@
pass
-class Timeout(Iterable[float]):
- def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
- self.end_time = time.time() + timeout
- self.message = message
- self.min_tick = min_tick
- self.prev_tick_at = time.time()
- self.no_exc = no_exc
-
- def tick(self) -> bool:
- current_time = time.time()
-
- if current_time > self.end_time:
- if self.message:
- msg = "Timeout: {}".format(self.message)
- else:
- msg = "Timeout"
-
- if self.no_exc:
- return False
-
- raise TimeoutError(msg)
-
- sleep_time = self.min_tick - (current_time - self.prev_tick_at)
- if sleep_time > 0:
- time.sleep(sleep_time)
- self.prev_tick_at = time.time()
- else:
- self.prev_tick_at = current_time
-
- return True
-
- def __iter__(self) -> Iterator[float]:
- return cast(Iterator[float], self)
-
- def __next__(self) -> float:
- if not self.tick():
- raise StopIteration()
- return self.end_time - time.time()
-
-
-def greater_digit_pos(val: Number) -> int:
- return int(math.floor(math.log10(val))) + 1
-
-
-def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
- pow = 10 ** (greater_digit_pos(val) - num_digits)
- return type(val)(int(val / pow) * pow)
-
-
-def is_ip(data: str) -> bool:
- try:
- ipaddress.ip_address(data)
- return True
- except ValueError:
- return False
-
-
def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
logger.debug("Starts : " + message)
return LogError(message, exc_logger)
@@ -133,212 +62,6 @@
raise StopTestError(message)
-def parse_creds(creds: str) -> Tuple[str, str, str]:
- """Parse simple credentials format user[:passwd]@host"""
- user, passwd_host = creds.split(":", 1)
-
- if '@' not in passwd_host:
- passwd, host = passwd_host, None
- else:
- passwd, host = passwd_host.rsplit('@', 1)
-
- return user, passwd, host
-
-
-SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
-
-
-def ssize2b(ssize: Union[str, int]) -> int:
- try:
- if isinstance(ssize, int):
- return ssize
-
- ssize = ssize.lower()
- if ssize[-1] in SMAP:
- return int(ssize[:-1]) * SMAP[ssize[-1]]
- return int(ssize)
- except (ValueError, TypeError, AttributeError):
- raise ValueError("Unknow size format {!r}".format(ssize))
-
-
-RSMAP = [('K', 1024),
- ('M', 1024 ** 2),
- ('G', 1024 ** 3),
- ('T', 1024 ** 4)]
-
-
-def b2ssize(value: Union[int, float]) -> str:
- if isinstance(value, float) and value < 100:
- return b2ssize_10(value)
-
- value = int(value)
- if value < 1024:
- return str(value) + " "
-
- # make mypy happy
- scale = 1
- name = ""
-
- for name, scale in RSMAP:
- if value < 1024 * scale:
- if value % scale == 0:
- return "{} {}i".format(value // scale, name)
- else:
- return "{:.1f} {}i".format(float(value) / scale, name)
-
- return "{}{}i".format(value // scale, name)
-
-
-RSMAP_10 = [(' f', 0.001 ** 4),
- (' n', 0.001 ** 3),
- (' u', 0.001 ** 2),
- (' m', 0.001),
- (' ', 1),
- (' K', 1000),
- (' M', 1000 ** 2),
- (' G', 1000 ** 3),
- (' T', 1000 ** 4),
- (' P', 1000 ** 5),
- (' E', 1000 ** 6)]
-
-
-def has_next_digit_after_coma(x: float) -> bool:
- return x * 10 - int(x * 10) > 1
-
-
-def has_second_digit_after_coma(x: float) -> bool:
- return (x * 10 - int(x * 10)) * 10 > 1
-
-
-def b2ssize_10(value: Union[int, float]) -> str:
- # make mypy happy
- scale = 1
- name = " "
-
- if value == 0.0:
- return "0 "
-
- if value / RSMAP_10[0][1] < 1.0:
- return "{:.2e} ".format(value)
-
- for name, scale in RSMAP_10:
- cval = value / scale
- if cval < 1000:
- # detect how many digits after dot to show
- if cval > 100:
- return "{}{}".format(int(cval), name)
- if cval > 10:
- if has_next_digit_after_coma(cval):
- return "{:.1f}{}".format(cval, name)
- else:
- return "{}{}".format(int(cval), name)
- if cval >= 1:
- if has_second_digit_after_coma(cval):
- return "{:.2f}{}".format(cval, name)
- elif has_next_digit_after_coma(cval):
- return "{:.1f}{}".format(cval, name)
- return "{}{}".format(int(cval), name)
- raise AssertionError("Can't get here")
-
- return "{}{}".format(int(value // scale), name)
-
-
-def run_locally(cmd: Union[str, List[str]], input_data: str = "", timeout: int = 20) -> str:
- if isinstance(cmd, str):
- shell = True
- cmd_str = cmd
- else:
- shell = False
- cmd_str = " ".join(cmd)
-
- proc = subprocess.Popen(cmd,
- shell=shell,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- res = [] # type: List[Tuple[bytes, bytes]]
-
- def thread_func() -> None:
- rr = proc.communicate(input_data.encode("utf8"))
- res.extend(rr)
-
- thread = threading.Thread(target=thread_func,
- name="Local cmd execution")
- thread.daemon = True
- thread.start()
- thread.join(timeout)
-
- if thread.is_alive():
- if psutil is not None:
- parent = psutil.Process(proc.pid)
- for child in parent.children(recursive=True):
- child.kill()
- parent.kill()
- else:
- proc.kill()
-
- thread.join()
- raise RuntimeError("Local process timeout: " + cmd_str)
-
- stdout_data, stderr_data = zip(*res) # type: List[bytes], List[bytes]
-
- out = b"".join(stdout_data).decode("utf8")
- err = b"".join(stderr_data).decode("utf8")
-
- if 0 != proc.returncode:
- raise subprocess.CalledProcessError(proc.returncode,
- cmd_str, out + err)
-
- return out
-
-
-def get_ip_for_target(target_ip: str) -> str:
- if not is_ip(target_ip):
- target_ip = socket.gethostbyname(target_ip)
-
- first_dig = map(int, target_ip.split("."))
- if first_dig == 127:
- return '127.0.0.1'
-
- data = run_locally('ip route get to'.split(" ") + [target_ip])
-
- rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
- rr1 = rr1.replace(" ", r'\s+')
- rr1 = rr1.format(target_ip.replace('.', r'\.'))
-
- rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
- rr2 = rr2.replace(" ", r'\s+')
- rr2 = rr2.format(target_ip.replace('.', r'\.'))
-
- data_line = data.split("\n")[0].strip()
- res1 = re.match(rr1, data_line)
- res2 = re.match(rr2, data_line)
-
- if res1 is not None:
- return res1.group('ip')
-
- if res2 is not None:
- return res2.group('ip')
-
- raise OSError("Can't define interface for {0}".format(target_ip))
-
-
-def open_for_append_or_create(fname: str) -> IO[str]:
- if not os.path.exists(fname):
- return open(fname, "w")
-
- fd = open(fname, 'r+')
- fd.seek(0, os.SEEK_END)
- return fd
-
-
-def sec_to_str(seconds: int) -> str:
- h = seconds // 3600
- m = (seconds % 3600) // 60
- s = seconds % 60
- return "{}:{:02d}:{:02d}".format(h, m, s)
-
-
def yamable(data: Any) -> Any:
if isinstance(data, (tuple, list)):
return map(yamable, data)
@@ -352,16 +75,6 @@
return data
-def flatten(data: Iterable[Any]) -> List[Any]:
- res = []
- for i in data:
- if isinstance(i, (list, tuple, set)):
- res.extend(flatten(i))
- else:
- res.append(i)
- return res
-
-
def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
fc = open(path).read()
@@ -369,7 +82,7 @@
msg = "Failed to get creads from openrc file"
with LogError(msg):
- data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo)
+ data = run_locally(['/bin/bash'], input_data=(fc + "\n" + echo).encode('utf8')).decode("utf8")
msg = "Failed to get creads from openrc file: " + data
with LogError(msg):
@@ -383,19 +96,6 @@
return user, passwd, tenant, auth_url, insecure
-def which(program: str) -> Optional[str]:
- def is_exe(fpath):
- return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
- for path in os.environ["PATH"].split(os.pathsep):
- path = path.strip('"')
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
-
- return None
-
-
@contextlib.contextmanager
def empty_ctx(val: Any = None) -> Iterator[Any]:
yield val
@@ -414,17 +114,6 @@
return results_dir, run_uuid
-def to_ip(host_or_ip: str) -> str:
- # translate hostname to address
- try:
- ipaddress.ip_address(host_or_ip)
- return host_or_ip
- except ValueError:
- ip_addr = socket.gethostbyname(host_or_ip)
- logger.info("Will use ip_addr %r instead of hostname %r", ip_addr, host_or_ip)
- return ip_addr
-
-
def get_time_interval_printable_info(seconds: int) -> Tuple[str, str]:
exec_time_s = sec_to_str(seconds)
now_dt = datetime.datetime.now()
@@ -432,68 +121,6 @@
return exec_time_s, "{:%H:%M:%S}".format(end_dt)
-FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
-FM_FUNC_RES = TypeVar("FM_FUNC_RES")
-
-
-def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
- inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
- for val in inp_iter:
- for res in func(val):
- yield res
-
-
-_coefs = {
- 'n': Fraction(1, 1000**3),
- 'u': Fraction(1, 1000**2),
- 'm': Fraction(1, 1000),
- 'K': 1000,
- 'M': 1000 ** 2,
- 'G': 1000 ** 3,
- 'Ki': 1024,
- 'Mi': 1024 ** 2,
- 'Gi': 1024 ** 3,
-}
-
-
-def split_unit(units: str) -> Tuple[Union[Fraction, int], str]:
- if len(units) > 2 and units[:2] in _coefs:
- return _coefs[units[:2]], units[2:]
- if len(units) > 1 and units[0] in _coefs:
- return _coefs[units[0]], units[1:]
- else:
- return 1, units
-
-
-conversion_cache = {}
-
-
-def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
- key = (from_unit, to_unit)
- if key in conversion_cache:
- return conversion_cache[key]
-
- f1, u1 = split_unit(from_unit)
- f2, u2 = split_unit(to_unit)
-
- assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit)
-
- if isinstance(f1, int) and isinstance(f2, int):
- if f1 % f2 != 0:
- res = Fraction(f1, f2)
- else:
- res = f1 // f2
- else:
- res = f1 / f2
-
- if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1:
- res = cast(Fraction, res).numerator
-
- conversion_cache[key] = res
-
- return res
-
-
def shape2str(shape: Iterable[int]) -> str:
return "*".join(map(str, shape))