some work on statistic code
diff --git a/configs-examples/local_lxc_ceph.yaml b/configs-examples/local_lxc_ceph.yaml
index 78b4866..d5b1403 100644
--- a/configs-examples/local_lxc_ceph.yaml
+++ b/configs-examples/local_lxc_ceph.yaml
@@ -12,5 +12,5 @@
load: rrd
params:
FILENAME: /tmp/fl.bin
- FILESIZE: 400M
+ FILESIZE: 4G
diff --git a/v2_plans.md b/v2_plans.md
index aff1bf8..ccacc40 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,3 +1,23 @@
+* TODO next
+ * all integral sensors gap interpolation
+ * run sensors in thread pool, optimize communication with ceph, can run fist OSD request for
+ data validation only on start. Each sensor should collect only one portion of data. During
+ start it should scan all awailable sources and tell upper code to create separated funcs for them.
+ All funcs should run in separated threads
+ * run test with sensor on large and small file
+ * Move test load code to io.fio file
+ * Revise structures and types location in files, structures names,
+ add dot file for classes and function dependencies
+ * Load latency into 2D numpy.array, same for everything else
+ * Latency statistic - mostly the same as iops, but no average, dispersion and conf interval
+ * Start generating first report images and put them into simple document
+ - iops over time
+ - bw over time
+ - 50ppc + 95ppc Lat over time with boxplots in same graph for selected points
+ * Statistic in background?
+ * UT, which run test with predefined in yaml cluster (cluster and config created separatelly, not with tests)
+ and check that result storage work as expected
+
* Code:
* Allow to cleanup all uncleaned from previous run 'wally cleanup PATH'
* RPC reconnect in case of errors
@@ -8,12 +28,11 @@
- perf
- [bcc](https://github.com/iovisor/bcc)
- ceph sensors
- - run sensors in thread pool
* Config revised:
* Result config then validated
* Add sync 4k write with small set of thcount
* Flexible SSH connection creds - use agent, default ssh settings or part of config
- * Remove created temporary files
+ * Remove created temporary files - create all tempfiles via func from .utils, which track them
* Use ceph-monitoring from wally
* Remove warm-up time from fio. Use warm-up detection to select real test time,
also fio/OS log files should be used to get test results, not directly
diff --git a/wally/config.py b/wally/config.py
index cb47567..7554fb8 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -6,8 +6,6 @@
class Config(IStorable):
- yaml_tag = 'config'
-
def __init__(self, dct: ConfigBlock) -> None:
# make mypy happy, set fake dict
self.__dict__['_dct'] = {}
diff --git a/wally/main.py b/wally/main.py
index 16d884f..0de6791 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -44,6 +44,7 @@
from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
RunTestsStage, ConnectStage, SleepStage, PrepareNodes,
LoadStoredNodesStage)
+from .process_results import CalcStatisticStage
from .report import ConsoleReportStage, HtmlReportStage
from .sensors import StartSensorsStage, CollectSensorsStage
@@ -228,8 +229,9 @@
storage = make_storage(config.storage_url)
storage.put(config, 'config')
+
stages.extend(get_run_stages())
- stages.extend(SaveNodesStage())
+ stages.append(SaveNodesStage())
if not opts.dont_collect:
stages.append(CollectInfoStage())
@@ -264,8 +266,12 @@
return 0
elif opts.subparser_name == 'report':
+ if getattr(opts, "no_report", False):
+ print(" --no-report option can't be used with 'report' cmd")
+ return 1
storage = make_storage(opts.data_dir, existing=True)
- config.settings_dir = get_config_path(config, opts.settings_dir)
+ config = storage.load(Config, 'config')
+ stages.append(LoadStoredNodesStage())
elif opts.subparser_name == 'compare':
# x = run_test.load_data_from_path(opts.data_path1)
@@ -286,9 +292,9 @@
return 1
return 0
-
report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
+ report_stages.append(CalcStatisticStage())
report_stages.append(ConsoleReportStage())
report_stages.append(HtmlReportStage())
@@ -349,7 +355,12 @@
if not failed:
for report_stage in report_stages:
with log_stage(report_stage):
- report_stage.run(ctx)
+ try:
+ report_stage.run(ctx)
+ except utils.StopTestError:
+ logger.error("Report stage %s requested stop execution", report_stage.name())
+ failed = True
+ break
ctx.storage.sync()
diff --git a/wally/node.py b/wally/node.py
index e85ded0..d4da52a 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -196,7 +196,7 @@
if expanduser:
remote_path = self.conn.fs.expanduser(remote_path)
- data = open(local_path, 'rb').read()
+ data = open(local_path, 'rb').read() # type: bytes
return self.put_to_file(remote_path, data)
def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index d0fabac..bc07bb3 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -10,9 +10,6 @@
class NodeInfo(IStorable):
"""Node information object, result of discovery process or config parsing"""
-
- yaml_tag = 'node_info'
-
def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
# ssh credentials
self.ssh_creds = ssh_creds
diff --git a/wally/openstack.py b/wally/openstack.py
index 264cfbb..8aa9913 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -35,8 +35,9 @@
def get_OS_credentials(ctx: TestRun) -> OSCreds:
- if "openstack_openrc" in ctx.storage:
- return OSCreds(*cast(List, ctx.storage.get("openstack_openrc")))
+ stored = ctx.storage.get("openstack_openrc", None)
+ if stored is not None:
+ return OSCreds(*cast(List, stored))
creds = None # type: OSCreds
os_creds = None # type: OSCreds
diff --git a/wally/process_results.py b/wally/process_results.py
index f01b14e..0e8cb1c 100644
--- a/wally/process_results.py
+++ b/wally/process_results.py
@@ -1,2 +1,53 @@
# put all result preprocessing here
# selection, aggregation
+
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .statistic import calc_norm_stat_props, NormStatProps
+from .result_classes import NormStatProps
+
+class CalcStatisticStage(Stage):
+ priority = StepOrder.TEST + 1
+
+ def run(self, ctx: TestRun) -> None:
+ results = {}
+
+ for is_file, name in ctx.storage.list("result"):
+ if is_file:
+ continue
+
+ path = "result/{}".format(name)
+ info = ctx.storage.get("result/{}/info".format(name))
+
+ if info['test'] == 'fio':
+ for node in info['nodes']:
+ data_path = "{}/measurement/{}".format(path, node)
+
+ iops = ctx.storage.get_array('Q', data_path, 'iops_data')
+ iops_stat_path = "{}/iops_stat".format(data_path)
+ if iops_stat_path in ctx.storage:
+ iops_stat= ctx.storage.load(NormStatProps, iops_stat_path)
+ else:
+ iops_stat = calc_norm_stat_props(iops)
+ ctx.storage.put(iops_stat, iops_stat_path)
+
+ bw = ctx.storage.get_array('Q', data_path, 'bw_data')
+ bw_stat_path = "{}/bw_stat".format(data_path)
+ if bw_stat_path in ctx.storage:
+ bw_stat = ctx.storage.load(NormStatProps, bw_stat_path)
+ else:
+ bw_stat = calc_norm_stat_props(bw)
+ ctx.storage.put(bw_stat, bw_stat_path)
+
+ lat = ctx.storage.get_array('L', data_path, 'lat_data')
+ lat_stat = None
+
+ results[name] = (iops, iops_stat, bw, bw_stat, lat, lat_stat)
+
+ for name, (iops, iops_stat, bw, bw_stat, lat, lat_stat) in results.items():
+ print(" ------------------- IOPS -------------------")
+ print(iops_stat) # type: ignore
+ print(" ------------------- BW -------------------")
+ print(bw_stat) # type: ignore
+ # print(" ------------------- LAT -------------------")
+ # print(calc_stat_props(lat))
diff --git a/wally/report.py b/wally/report.py
index 0c96280..58dcf56 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,65 +1,25 @@
-import os
-import csv
import abc
-import bisect
import logging
-import itertools
-import collections
-from io import StringIO
from typing import Dict, Any, Iterator, Tuple, cast, List
-try:
- import numpy
- import scipy
- import matplotlib
- matplotlib.use('svg')
- import matplotlib.pyplot as plt
-except ImportError:
- plt = None
+import numpy
+import scipy
+import matplotlib
+import matplotlib.pyplot as plt
-import wally
+
+matplotlib.use('svg')
+
+
from .utils import ssize2b
-from .storage import Storage
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .result_classes import TestInfo, FullTestResult, SensorInfo
-from .suits.io.fio_task_parser import (get_test_sync_mode,
- get_test_summary,
- parse_all_in_1,
- abbv_name_to_full)
+from .result_classes import NormStatProps
logger = logging.getLogger("wally")
-def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
- raise NotImplementedError()
- # sensors_data = {} # type: Dict[Tuple[str, str, str], SensorInfo]
- #
- # mstorage = storage.sub_storage("metric")
- # for _, node_id in mstorage.list():
- # for _, dev_name in mstorage.list(node_id):
- # for _, sensor_name in mstorage.list(node_id, dev_name):
- # key = (node_id, dev_name, sensor_name)
- # si = SensorInfo(*key)
- # si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name] # type: ignore
- # sensors_data[key] = si
- #
- # rstorage = storage.sub_storage("result")
- # for _, run_id in rstorage.list():
- # ftr = FullTestResult()
- # ftr.test_info = rstorage.load(TestInfo, run_id, "info")
- # ftr.performance_data = {}
- #
- # p1 = "{}/measurement".format(run_id)
- # for _, node_id in rstorage.list(p1):
- # for _, measurement_name in rstorage.list(p1, node_id):
- # perf_key = (node_id, measurement_name)
- # ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
- #
- # yield ftr
-
-
class ConsoleReportStage(Stage):
priority = StepOrder.REPORT
@@ -135,6 +95,8 @@
# IOPS/latency over test time
class IOPSTime(Reporter):
"""IOPS/latency during test"""
+ def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ pass
# Cluster load over test time
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 7244225..e306525 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,6 +1,12 @@
import abc
import array
-from typing import Dict, List, Any, Tuple, Optional, Union, Type
+from typing import Dict, List, Any, Optional
+
+import numpy
+from scipy import stats
+
+
+from .utils import IStorable, Number, round_digits
class TimeSerie:
@@ -73,29 +79,78 @@
self.extra_logs = {} # type: Dict[str, bytes]
-class FullTestResult:
- test_info = None # type: TestInfo
+class NormStatProps(IStorable):
+ "Statistic properties for timeserie"
+ def __init__(self, data: List[Number]) -> None:
+ self.average = None # type: float
+ self.deviation = None # type: float
+ self.confidence = None # type: float
+ self.confidence_level = None # type: float
- # TODO(koder): array.array or numpy.array?
- # {(node_id, perf_metrics_name): values}
- performance_data = None # type: Dict[Tuple[str, str], List[int]]
+ self.perc_99 = None # type: float
+ self.perc_95 = None # type: float
+ self.perc_90 = None # type: float
+ self.perc_50 = None # type: float
- # {(node_id, perf_metrics_name): values}
- sensors_data = None # type: Dict[Tuple[str, str, str], SensorInfo]
+ self.min = None # type: Number
+ self.max = None # type: Number
+ # bin_center: bin_count
+ self.bins_populations = None # type: List[int]
+ self.bins_edges = None # type: List[float]
+ self.data = data
-class IStorable(metaclass=abc.ABCMeta):
- """Interface for type, which can be stored"""
+ self.normtest = None # type: Any
- @abc.abstractmethod
+ def __str__(self) -> str:
+ res = ["StatProps(size = {}):".format(len(self.data)),
+ " distr = {} ~ {}".format(round_digits(self.average), round_digits(self.deviation)),
+ " confidence({0.confidence_level}) = {1}".format(self, round_digits(self.confidence)),
+ " perc_50 = {}".format(round_digits(self.perc_50)),
+ " perc_90 = {}".format(round_digits(self.perc_90)),
+ " perc_95 = {}".format(round_digits(self.perc_95)),
+ " perc_99 = {}".format(round_digits(self.perc_99)),
+ " range {} {}".format(round_digits(self.min), round_digits(self.max)),
+ " normtest = {0.normtest}".format(self)]
+ return "\n".join(res)
+
+ def __repr__(self) -> str:
+ return str(self)
+
def raw(self) -> Dict[str, Any]:
- pass
+ data = self.__dict__.copy()
+ data['nortest'] = (data['nortest'].statistic, data['nortest'].pvalue)
+ data['bins_edges'] = list(self.bins_edges)
+ return data
- @abc.abstractclassmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- pass
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'NormStatProps':
+ data['nortest'] = stats.mstats.NormaltestResult(data['nortest'].statistic, data['nortest'].pvalue)
+ data['bins_edges'] = numpy.array(data['bins_edges'])
+ res = cls.__new__(cls)
+ res.__dict__.update(data)
+ return res
-Basic = Union[int, str, bytes, bool, None]
-Storable = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
+class ProcessedTestResults:
+ def __init__(self, info: Dict[str, Any],
+ metrics: Dict[str, NormStatProps]) -> None:
+ self.test = info['test']
+ self.profile = info['profile']
+ self.suite = info['suite']
+ self.name = "{0.suite}.{0.test}.{0.profile}".format(self)
+ self.info = info
+ self.metrics = metrics # mapping {metrics_name: StatProps}
+
+
+# class FullTestResult:
+# test_info = None # type: TestInfo
+#
+# # TODO(koder): array.array or numpy.array?
+# # {(node_id, perf_metrics_name): values}
+# performance_data = None # type: Dict[Tuple[str, str], List[int]]
+#
+# # {(node_id, perf_metrics_name): values}
+# sensors_data = None # type: Dict[Tuple[str, str, str], SensorInfo]
+
diff --git a/wally/run_test.py b/wally/run_test.py
index 8e8a4e9..2cdf0e8 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -16,7 +16,6 @@
from .suits.omgbench import OmgTest
from .suits.postgres import PgBenchTest
from .test_run_class import TestRun
-from .statistic import calc_stat_props
from .utils import StopTestError
@@ -87,10 +86,9 @@
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
if path in ctx.storage:
- previous = ctx.storage.get_raw(path)
+ ctx.storage.append_raw(log, path)
else:
- previous = b""
- ctx.storage.put_raw(previous + log, path)
+ ctx.storage.put_raw(log, path)
logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
with ctx.get_pool() as pool:
@@ -265,29 +263,6 @@
pass
-class CalcStatisticStage(Stage):
- priority = StepOrder.TEST + 1
-
- def run(self, ctx: TestRun) -> None:
- results = {}
- for name, summary, stor_path in ctx.storage.get("all_results"):
- if name == 'fio':
- test_info = ctx.storage.get(stor_path, "info")
- for node in test_info['nodes']:
- iops = ctx.storage.get_array(stor_path, node, 'iops_data')
- bw = ctx.storage.get_array(stor_path, node, 'bw_data')
- lat = ctx.storage.get_array(stor_path, node, 'lat_data')
- results[summary] = (iops, bw, lat)
-
- for name, (iops, bw, lat) in results.items():
- print(" ------------------- IOPS -------------------")
- print(calc_stat_props(iops)) # type: ignore
- print(" ------------------- BW -------------------")
- print(calc_stat_props(bw)) # type: ignore
- # print(" ------------------- LAT -------------------")
- # print(calc_stat_props(lat))
-
-
class LoadStoredNodesStage(Stage):
priority = StepOrder.DISCOVER
diff --git a/wally/statistic.py b/wally/statistic.py
index 2263788..2f68ca5 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,4 +1,5 @@
import math
+import logging
import itertools
import statistics
from typing import Union, List, TypeVar, Callable, Iterable, Tuple, Any, cast, Dict
@@ -9,83 +10,22 @@
from numpy.polynomial.chebyshev import chebfit, chebval
-from .result_classes import IStorable
+from .result_classes import NormStatProps
+from .utils import Number
-Number = Union[int, float]
-TNumber = TypeVar('TNumber', int, float)
-
-
+logger = logging.getLogger("wally")
DOUBLE_DELTA = 1e-8
average = statistics.mean
-dev = statistics.variance
+dev = lambda x: math.sqrt(statistics.variance(x))
-class StatProps(IStorable):
- "Statistic properties for timeserie"
-
- yaml_tag = 'stat'
-
- def __init__(self, data: List[Number]) -> None:
- self.average = None # type: float
- self.deviation = None # type: float
- self.confidence = None # type: float
- self.confidence_level = None # type: float
-
- self.perc_99 = None # type: float
- self.perc_95 = None # type: float
- self.perc_90 = None # type: float
- self.perc_50 = None # type: float
-
- self.min = None # type: Number
- self.max = None # type: Number
-
- # bin_center: bin_count
- self.histo = None # type: Tuple[List[int], List[float]]
- self.data = data
-
- self.normtest = None # type: Any
-
- def __str__(self) -> str:
- res = ["StatProps(num_el={}):".format(len(self.data)),
- " distr = {0.average} ~ {0.deviation}".format(self),
- " confidence({0.confidence_level}) = {0.confidence}".format(self),
- " perc50={0.perc50}".format(self),
- " perc90={0.perc90}".format(self),
- " perc95={0.perc95}".format(self),
- " perc95={0.perc99}".format(self),
- " range {0.min} {0.max}".format(self),
- " nurmtest = {0.nortest}".format(self)]
- return "\n".join(res)
-
- def __repr__(self) -> str:
- return str(self)
-
- def raw(self) -> Dict[str, Any]:
- return self.__dict__.copy()
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
- res = cls.__new__(cls)
- res.__dict__.update(data)
- return res
-
-
-def greater_digit_pos(val: Number) -> int:
- return int(math.floor(math.log10(val))) + 1
-
-
-def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
- pow = 10 ** (greater_digit_pos(val) - num_digits)
- return type(val)(int(val / pow) * pow)
-
-
-def calc_stat_props(data: List[Number], confidence: float = 0.95) -> StatProps:
+def calc_norm_stat_props(data: List[Number], confidence: float = 0.95) -> NormStatProps:
"Calculate statistical properties of array of numbers"
- res = StatProps(data)
+ res = NormStatProps(data)
if len(data) == 0:
raise ValueError("Input array is empty")
@@ -93,6 +33,7 @@
data = sorted(data)
res.average = average(data)
res.deviation = dev(data)
+
res.max = data[-1]
res.min = data[0]
@@ -107,8 +48,13 @@
else:
res.confidence = None
- res.histo = numpy.histogram(data, 'auto')
- res.normtest = stats.mstats.normaltest(data)
+ res.bin_populations, res.bin_edges = numpy.histogram(data, 'auto')
+
+ try:
+ res.normtest = stats.mstats.normaltest(data)
+ except Exception as exc:
+ logger.warning("stats.mstats.normaltest failed with error: %s", exc)
+
return res
diff --git a/wally/storage.py b/wally/storage.py
index 2c0a26b..d33f8e5 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -7,7 +7,7 @@
import array
import shutil
import sqlite3
-from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable
+from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
import yaml
try:
@@ -51,6 +51,10 @@
def sub_storage(self, path: str) -> 'ISimpleStorage':
pass
+ @abc.abstractmethod
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
+ pass
+
class ISerializer(metaclass=abc.ABCMeta):
"""Interface for serialization class"""
@@ -145,15 +149,18 @@
print(key, data_ln, type)
print("------------------ END --------------------")
- def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
- raise NotImplementedError("SQLITE3 doesn't provide fd-like interface")
-
def sub_storage(self, path: str) -> 'DBStorage':
return self.__class__(prefix=self.prefix + path, db=self.db)
def sync(self):
self.db.commit()
+ def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
+ raise NotImplementedError("SQLITE3 doesn't provide fd-like interface")
+
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
+ raise NotImplementedError("SQLITE3 doesn't provide list method")
+
DB_REL_PATH = "__db__.db"
@@ -164,6 +171,7 @@
def __init__(self, root_path: str, existing: bool) -> None:
self.root_path = root_path
self.existing = existing
+ self.ignored = {self.j(DB_REL_PATH), '.', '..'}
def j(self, path: str) -> str:
return os.path.join(self.root_path, path)
@@ -196,11 +204,10 @@
if "cb" == mode:
create_on_fail = True
mode = "rb+"
+ os.makedirs(os.path.dirname(jpath), exist_ok=True)
else:
create_on_fail = False
- os.makedirs(os.path.dirname(jpath), exist_ok=True)
-
try:
fd = open(jpath, mode)
except IOError:
@@ -216,6 +223,14 @@
def sync(self):
pass
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
+ for fobj in os.scandir(self.j(path)):
+ if fobj.path not in self.ignored:
+ if fobj.is_dir():
+ yield False, fobj.name
+ else:
+ yield True, fobj.name
+
class YAMLSerializer(ISerializer):
"""Serialize data to yaml"""
@@ -244,6 +259,10 @@
ObjClass = TypeVar('ObjClass', bound=IStorable)
+class _Raise:
+ pass
+
+
class Storage:
"""interface for storage"""
def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
@@ -268,8 +287,15 @@
self.db.put(serialized, fpath)
self.fs.put(serialized, fpath)
- def get(self, *path: str) -> Any:
- return self.serializer.unpack(self.db.get("/".join(path)))
+ def get(self, path: str, default: Any = _Raise) -> Any:
+ try:
+ vl = self.db.get(path)
+ except:
+ if default is _Raise:
+ raise
+ return default
+
+ return self.serializer.unpack(vl)
def rm(self, *path: str) -> None:
fpath = "/".join(path)
@@ -285,6 +311,11 @@
def get_raw(self, *path: str) -> bytes:
return self.fs.get("/".join(path))
+ def append_raw(self, value: bytes, *path: str) -> None:
+ with self.fs.get_fd("/".join(path), "rb+") as fd:
+ fd.seek(offset=0, whence=os.SEEK_END)
+ fd.write(value)
+
def get_fd(self, path: str, mode: str = "r") -> IO:
return self.fs.get_fd(path, mode)
@@ -329,6 +360,9 @@
def __exit__(self, x: Any, y: Any, z: Any) -> None:
self.sync()
+ def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
+ return self.fs.list("/".join(path))
+
def make_storage(url: str, existing: bool = False) -> Storage:
return Storage(FSStorage(url, existing),
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 460214c..bc4e77e 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,6 +1,5 @@
config: Config - full configuration
all_nodes: List[NodeInfo] - all nodes
-all_results: List[Tuple[str, str, str]] - (test_type, test_summary, result_path)
cli: List[str] - cli options
spawned_nodes_ids: List[int] - list of openstack VM, spawned for test
@@ -14,7 +13,17 @@
info/run_time : float - run unix time
# test results
-result/{descr}_{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
+result/{descr}_{id}/info :
+ test_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config.raw(),
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids,
+ 'begin_time': min_start_time,
+ 'end_time': max_stop_time
+ }
+
result/{descr}_{id}/measurement/{node}/{name}_raw : bytes - raw log, where name from {'bw', 'iops', 'lat'}
result/{descr}_{id}/measurement/{node}/{name}_data - List[uint64] - measurements data.
result/{descr}_{id}/measurement/{node}/{name}_meta - Dict[str, Any] - measurements metadata.
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index fb18165..c32dba2 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -35,7 +35,6 @@
self.force_prefill = get('force_prefill', False) # type: bool
self.load_profile_name = self.config.params['load'] # type: str
- self.name = "io." + self.load_profile_name
if os.path.isfile(self.load_profile_name):
self.load_profile_path = self.load_profile_name # type: str
@@ -193,12 +192,14 @@
try:
time_ms_s, val_s, _, *rest = line.split(",")
time_ms = int(time_ms_s.strip())
- if prev_ts and abs(time_ms - prev_ts - expected_time_delta) > max_time_diff:
- logger.warning("Too large gap in {} log at {} - {}ms"
- .format(time_ms, name, time_ms - prev_ts))
- else:
+
+ if not prev_ts:
prev_ts = time_ms - expected_time_delta
load_start_at = time_ms
+ elif abs(time_ms - prev_ts - expected_time_delta) > max_time_diff:
+ logger.warning("Too large gap in {} log at {} - {}ms"
+ .format(name, time_ms, time_ms - prev_ts))
+
if name == 'lat':
vals = [int(i.strip()) for i in rest]
@@ -213,6 +214,7 @@
except ValueError:
logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
raise StopTestError()
+
prev_ts += expected_time_delta
res.series[name] = TimeSerie(name=name,
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 6790c97..bdcf4a3 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -39,8 +39,6 @@
class FioJobSection(IterationConfig, IStorable):
- yaml_tag = 'fio_job'
-
def __init__(self, name: str) -> None:
self.name = name
self.vals = OrderedDict() # type: Dict[str, Any]
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index d32d6a8..850c9a3 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,7 +1,7 @@
[global]
include defaults_qd.cfg
ramp_time=0
-runtime=4
+runtime=600
[test_{TEST_SUMM}]
blocksize=60k
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 78986f6..446ad69 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -86,6 +86,7 @@
# used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
max_time_diff = 5
max_rel_time_diff = 0.05
+ load_profile_name = None # type: str
def __init__(self, *args, **kwargs) -> None:
PerfTest.__init__(self, *args, **kwargs)
@@ -138,7 +139,9 @@
logger.info("All test iteration in storage already. Skip test")
return
- logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
+ logger.debug("Run test io.{} with profile {!r} on nodes {}.".format(self.name,
+ self.load_profile_name,
+ ",".join(self.sorted_nodes_ids)))
logger.debug("Prepare nodes")
with ThreadPoolExecutor(len(self.nodes)) as pool:
@@ -211,7 +214,9 @@
.format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
test_config = {
- 'name': self.name,
+ 'suite': 'io',
+ 'test': self.name,
+ 'profile': self.load_profile_name,
'iteration_name': iter_name,
'iteration_config': iteration_config.raw(),
'params': self.config.params,
@@ -221,15 +226,7 @@
}
self.process_storage_queue()
- self.config.storage.put(test_config, "result", str(run_id), "info")
-
- if "all_results" in self.config.storage:
- all_results = self.config.storage.get("all_results")
- else:
- all_results = []
-
- all_results.append([self.name, iteration_config.summary, current_result_path])
- self.config.storage.put(all_results, "all_results")
+ self.config.storage.put(test_config, current_result_path, "info")
self.config.storage.sync()
if self.on_idle is not None:
diff --git a/wally/utils.py b/wally/utils.py
index dd9cdd5..13cd675 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,6 +1,8 @@
import re
import os
+import abc
import sys
+import math
import time
import uuid
import socket
@@ -12,8 +14,8 @@
import collections
from .node_interfaces import IRPCNode
-from typing import (Any, Tuple, Union, List, Iterator, Dict, Callable, Iterable, Optional,
- IO, Sequence, NamedTuple, cast)
+from typing import (Any, Tuple, Union, List, Iterator, Dict, Iterable, Optional,
+ IO, Sequence, NamedTuple, cast, TypeVar)
try:
import psutil
@@ -28,14 +30,8 @@
logger = logging.getLogger("wally")
-
-
-def is_ip(data: str) -> bool:
- try:
- ipaddress.ip_address(data)
- return True
- except ValueError:
- return False
+TNumber = TypeVar('TNumber', int, float)
+Number = Union[int, float]
class StopTestError(RuntimeError):
@@ -63,27 +59,20 @@
raise StopTestError(self.message) from value
-def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
- logger.debug("Starts : " + message)
- return LogError(message, exc_logger)
+class IStorable(metaclass=abc.ABCMeta):
+ """Interface for type, which can be stored"""
+
+ @abc.abstractmethod
+ def raw(self) -> Dict[str, Any]:
+ pass
+
+ @abc.abstractclassmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ pass
-def check_input_param(is_ok: bool, message: str) -> None:
- if not is_ok:
- logger.error(message)
- raise StopTestError(message)
-
-
-def parse_creds(creds: str) -> Tuple[str, str, str]:
- """Parse simple credentials format user[:passwd]@host"""
- user, passwd_host = creds.split(":", 1)
-
- if '@' not in passwd_host:
- passwd, host = passwd_host, None
- else:
- passwd, host = passwd_host.rsplit('@', 1)
-
- return user, passwd, host
+Basic = Union[int, str, bytes, bool, None]
+Storable = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
class TaskFinished(Exception):
@@ -116,6 +105,86 @@
self.exited = True
+class Timeout(Iterable[float]):
+ def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
+ self.end_time = time.time() + timeout
+ self.message = message
+ self.min_tick = min_tick
+ self.prev_tick_at = time.time()
+ self.no_exc = no_exc
+
+ def tick(self) -> bool:
+ current_time = time.time()
+
+ if current_time > self.end_time:
+ if self.message:
+ msg = "Timeout: {}".format(self.message)
+ else:
+ msg = "Timeout"
+
+ if self.no_exc:
+ return False
+
+ raise TimeoutError(msg)
+
+ sleep_time = self.min_tick - (current_time - self.prev_tick_at)
+ if sleep_time > 0:
+ time.sleep(sleep_time)
+ self.prev_tick_at = time.time()
+ else:
+ self.prev_tick_at = current_time
+
+ return True
+
+ def __iter__(self) -> Iterator[float]:
+ return cast(Iterator[float], self)
+
+ def __next__(self) -> float:
+ if not self.tick():
+ raise StopIteration()
+ return self.end_time - time.time()
+
+
+def greater_digit_pos(val: Number) -> int:
+ return int(math.floor(math.log10(val))) + 1
+
+
+def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
+ pow = 10 ** (greater_digit_pos(val) - num_digits)
+ return type(val)(int(val / pow) * pow)
+
+
+def is_ip(data: str) -> bool:
+ try:
+ ipaddress.ip_address(data)
+ return True
+ except ValueError:
+ return False
+
+
+def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
+ logger.debug("Starts : " + message)
+ return LogError(message, exc_logger)
+
+
+def check_input_param(is_ok: bool, message: str) -> None:
+ if not is_ok:
+ logger.error(message)
+ raise StopTestError(message)
+
+
+def parse_creds(creds: str) -> Tuple[str, str, str]:
+ """Parse simple credentials format user[:passwd]@host"""
+ user, passwd_host = creds.split(":", 1)
+
+ if '@' not in passwd_host:
+ passwd, host = passwd_host, None
+ else:
+ passwd, host = passwd_host.rsplit('@', 1)
+
+ return user, passwd, host
+
+
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
@@ -399,46 +468,6 @@
return results_dir, run_uuid
-class Timeout(Iterable[float]):
- def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
- self.end_time = time.time() + timeout
- self.message = message
- self.min_tick = min_tick
- self.prev_tick_at = time.time()
- self.no_exc = no_exc
-
- def tick(self) -> bool:
- current_time = time.time()
-
- if current_time > self.end_time:
- if self.message:
- msg = "Timeout: {}".format(self.message)
- else:
- msg = "Timeout"
-
- if self.no_exc:
- return False
-
- raise TimeoutError(msg)
-
- sleep_time = self.min_tick - (current_time - self.prev_tick_at)
- if sleep_time > 0:
- time.sleep(sleep_time)
- self.prev_tick_at = time.time()
- else:
- self.prev_tick_at = current_time
-
- return True
-
- def __iter__(self) -> Iterator[float]:
- return cast(Iterator[float], self)
-
- def __next__(self) -> float:
- if not self.tick():
- raise StopIteration()
- return self.end_time - time.time()
-
-
def to_ip(host_or_ip: str) -> str:
# translate hostname to address
try: