resume working
diff --git a/wally/common_types.py b/wally/common_types.py
index 884cd44..ef809ee 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,4 +1,5 @@
-from typing import NamedTuple
+import abc
+from typing import NamedTuple, Dict, Any
IP = str
IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
diff --git a/wally/config.py b/wally/config.py
index 46669f0..cb47567 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,10 +1,13 @@
from typing import Any, Dict, Optional
-from .storage import IStorable
+from .result_classes import IStorable
-ConfigBlock = Any
+
+ConfigBlock = Dict[str, Any]
class Config(IStorable):
+ yaml_tag = 'config'
+
def __init__(self, dct: ConfigBlock) -> None:
# make mypy happy, set fake dict
self.__dict__['_dct'] = {}
@@ -25,17 +28,21 @@
# None, disabled, enabled, metadata, ignore_errors
self.discovery = None # type: Optional[str]
- self.logging = None # type: ConfigBlock
- self.ceph = None # type: ConfigBlock
- self.openstack = None # type: ConfigBlock
- self.fuel = None # type: ConfigBlock
- self.test = None # type: ConfigBlock
- self.sensors = None # type: ConfigBlock
+ self.logging = None # type: 'Config'
+ self.ceph = None # type: 'Config'
+ self.openstack = None # type: 'Config'
+ self.fuel = None # type: 'Config'
+ self.test = None # type: 'Config'
+ self.sensors = None # type: 'Config'
self._dct.clear()
self._dct.update(dct)
- def raw(self) -> ConfigBlock:
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'Config':
+ return cls(data)
+
+ def raw(self) -> Dict[str, Any]:
return self._dct
def get(self, path: str, default: Any = None) -> Any:
diff --git a/wally/fuel.py b/wally/fuel.py
index 902752a..37a5f5e 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -42,47 +42,49 @@
logger.info("Skip FUEL discovery due to config setting")
return
- if 'all_nodes' in ctx.storage:
- logger.debug("Skip FUEL discovery, use previously discovered nodes")
- ctx.fuel_openstack_creds = ctx.storage['fuel_os_creds'] # type: ignore
- ctx.fuel_version = ctx.storage['fuel_version'] # type: ignore
- return
+ if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
+ logger.debug("Skip FUEL credentials discovery, use previously discovered info")
+ ctx.fuel_openstack_creds = OSCreds(*cast(List, ctx.storage.get('fuel_os_creds')))
+ ctx.fuel_version = ctx.storage.get('fuel_version')
+ if 'all_nodes' in ctx.storage:
+ logger.debug("Skip FUEL nodes discovery, use data from DB")
+ return
+ elif discovery == 'metadata':
+ logger.debug("Skip FUEL nodes discovery due to discovery settings")
+ return
fuel = ctx.config.fuel
fuel_node_info = ctx.merge_node(fuel.ssh_creds, {'fuel_master'})
creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
fuel_conn = KeystoneAuth(fuel.url, creds)
- # get cluster information from REST API
- if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
- ctx.fuel_openstack_creds = ctx.storage['fuel_os_creds'] # type: ignore
- ctx.fuel_version = ctx.storage['fuel_version'] # type: ignore
- return
-
cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
cluster = reflect_cluster(fuel_conn, cluster_id)
- ctx.fuel_version = FuelInfo(fuel_conn).get_version()
- ctx.storage["fuel_version"] = ctx.fuel_version
- logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
- openrc = cluster.get_openrc()
+ if ctx.fuel_version is None:
+ ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+ ctx.storage.put(ctx.fuel_version, "fuel_version")
- if openrc:
- auth_url = cast(str, openrc['os_auth_url'])
- if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
- logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
- auth_url = auth_url.replace("https", "http", 1)
+ logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
+ openrc = cluster.get_openrc()
- os_creds = OSCreds(name=cast(str, openrc['username']),
- passwd=cast(str, openrc['password']),
- tenant=cast(str, openrc['tenant_name']),
- auth_url=cast(str, auth_url),
- insecure=cast(bool, openrc['insecure']))
+ if openrc:
+ auth_url = cast(str, openrc['os_auth_url'])
+ if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+ logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+ auth_url = auth_url.replace("https", "http", 1)
- ctx.fuel_openstack_creds = os_creds
- else:
- ctx.fuel_openstack_creds = None
- ctx.storage["fuel_os_creds"] = ctx.fuel_openstack_creds
+ os_creds = OSCreds(name=cast(str, openrc['username']),
+ passwd=cast(str, openrc['password']),
+ tenant=cast(str, openrc['tenant_name']),
+ auth_url=cast(str, auth_url),
+ insecure=cast(bool, openrc['insecure']))
+
+ ctx.fuel_openstack_creds = os_creds
+ else:
+ ctx.fuel_openstack_creds = None
+
+ ctx.storage.put(list(ctx.fuel_openstack_creds), "fuel_os_creds")
if discovery == 'metadata':
logger.debug("Skip FUEL nodes discovery due to discovery settings")
diff --git a/wally/hw_info.py b/wally/hw_info.py
index e81a5c1..aa53f8e 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -107,6 +107,11 @@
return str(self.hostname) + ":\n" + "\n".join(" " + i for i in res)
+class CephInfo:
+ def __init__(self) -> None:
+ pass
+
+
class SWInfo:
def __init__(self) -> None:
self.mtab = None # type: str
@@ -114,7 +119,12 @@
self.libvirt_version = None # type: Optional[str]
self.qemu_version = None # type: Optional[str]
self.OS_version = None # type: utils.OSRelease
- self.ceph_version = None # type: Optional[str]
+ self.ceph_info = None # type: Optional[CephInfo]
+
+
+def get_ceph_services_info(node: IRPCNode) -> CephInfo:
+ # TODO: use ceph-monitoring module
+ return CephInfo()
def get_sw_info(node: IRPCNode) -> SWInfo:
@@ -129,21 +139,17 @@
except OSError:
res.libvirt_version = None
- # try:
- # # dpkg -l ??
- # res.libvirt_version = node.run("virsh -v", nolog=True).strip()
- # except OSError:
- # res.libvirt_version = None
+ # dpkg -l ??
try:
res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
except OSError:
res.qemu_version = None
- try:
- res.ceph_version = node.run("ceph --version", nolog=True).strip()
- except OSError:
- res.ceph_version = None
+ for role in ('ceph-osd', 'ceph-mon', 'ceph-mds'):
+ if role in node.info.roles:
+ res.ceph_info = get_ceph_services_info(node)
+ break
return res
diff --git a/wally/main.py b/wally/main.py
index 6396b6b..16d884f 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -1,6 +1,7 @@
import os
import time
import signal
+import pprint
import logging
import argparse
import functools
@@ -41,7 +42,8 @@
from .openstack import DiscoverOSStage
from .fuel import DiscoverFuelStage
from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
- RunTestsStage, ConnectStage, SleepStage, PrepareNodes)
+ RunTestsStage, ConnectStage, SleepStage, PrepareNodes,
+ LoadStoredNodesStage)
from .report import ConsoleReportStage, HtmlReportStage
from .sensors import StartSensorsStage, CollectSensorsStage
@@ -72,9 +74,9 @@
except Exception as exc:
logger.warning("Can't load folder {}. Error {}".format(full_path, exc))
- comment = cast(str, stor['info/comment'])
- run_uuid = cast(str, stor['info/run_uuid'])
- run_time = cast(float, stor['info/run_time'])
+ comment = cast(str, stor.get('info/comment'))
+ run_uuid = cast(str, stor.get('info/run_uuid'))
+ run_time = cast(float, stor.get('info/run_time'))
test_types = ""
results.append((run_time,
run_uuid,
@@ -135,6 +137,12 @@
test_parser = subparsers.add_parser('resume', help='resume tests')
test_parser.add_argument("storage_dir", help="Path to test directory")
+ # ---------------------------------------------------------------------
+ test_parser = subparsers.add_parser('db', help='resume tests')
+ test_parser.add_argument("cmd", choices=("show",), help="Command to execute")
+ test_parser.add_argument("params", nargs='*', help="Command params")
+ test_parser.add_argument("storage_dir", help="Storage path")
+
return parser.parse_args(argv[1:])
@@ -182,6 +190,19 @@
return Config(cfg_dict)
+def get_run_stages() -> List[Stage]:
+ return [DiscoverCephStage(),
+ DiscoverOSStage(),
+ DiscoverFuelStage(),
+ ExplicitNodesStage(),
+ StartSensorsStage(),
+ RunTestsStage(),
+ CollectSensorsStage(),
+ ConnectStage(),
+ SleepStage(),
+ PrepareNodes()]
+
+
def main(argv: List[str]) -> int:
if faulthandler is not None:
faulthandler.register(signal.SIGUSR1, all_threads=True)
@@ -195,7 +216,6 @@
if opts.subparser_name == 'test':
config = load_config(opts.config_file)
-
config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
config.comment = opts.comment
config.keep_vm = opts.keep_vm
@@ -207,20 +227,9 @@
config.settings_dir = get_config_path(config, opts.settings_dir)
storage = make_storage(config.storage_url)
-
- storage['config'] = config # type: ignore
-
- stages.append(DiscoverCephStage())
- stages.append(DiscoverOSStage())
- stages.append(DiscoverFuelStage())
- stages.append(ExplicitNodesStage())
- stages.append(SaveNodesStage())
- stages.append(StartSensorsStage())
- stages.append(RunTestsStage())
- stages.append(CollectSensorsStage())
- stages.append(ConnectStage())
- stages.append(SleepStage())
- stages.append(PrepareNodes())
+ storage.put(config, 'config')
+ stages.extend(get_run_stages())
+ stages.extend(SaveNodesStage())
if not opts.dont_collect:
stages.append(CollectInfoStage())
@@ -229,15 +238,21 @@
if '--ssh-key-passwd' in argv2:
# don't save ssh key password to storage
argv2[argv2.index("--ssh-key-passwd") + 1] = "<removed from output>"
- storage['cli'] = argv2
+ storage.put(argv2, 'cli')
elif opts.subparser_name == 'resume':
+ opts.resumed = True
storage = make_storage(opts.storage_dir, existing=True)
config = storage.load(Config, 'config')
- # TODO: fix this
- # TODO: add node loading from storage
- # TODO: fill nodes conncreds with keys
- raise NotImplementedError("Resume in not fully implemented")
+ stages.extend(get_run_stages())
+ stages.append(LoadStoredNodesStage())
+ prev_opts = storage.get('cli')
+ if '--ssh-key-passwd' in prev_opts and opts.ssh_key_passwd:
+ prev_opts[prev_opts.index("--ssh-key-passwd") + 1] = opts.ssh_key_passwd
+
+ restored_opts = parse_args(prev_opts)
+ opts.__dict__.update(restored_opts.__dict__)
+ opts.subparser_name = 'resume'
elif opts.subparser_name == 'ls':
tab = texttable.Texttable(max_width=200)
@@ -259,6 +274,19 @@
# [x['io'][0], y['io'][0]]))
return 0
+ elif opts.subparser_name == 'db':
+ storage = make_storage(opts.storage_dir, existing=True)
+ if opts.cmd == 'show':
+ if len(opts.params) != 1:
+ print("'show' command requires parameter - key to show")
+ return 1
+ pprint.pprint(storage.get(opts.params[0]))
+ else:
+ print("Unknown/not_implemented command {!r}".format(opts.cmd))
+ return 1
+ return 0
+
+
report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
report_stages.append(ConsoleReportStage())
@@ -276,7 +304,7 @@
log_config_file = find_cfg_file(log_config_file, opts.config_file)
setup_loggers(getattr(logging, str_level),
- log_fd=storage.get_stream('log', "w"),
+ log_fd=storage.get_fd('log', "w"),
config_file=log_config_file)
logger.info("All info would be stored into %r", config.storage_url)
@@ -292,6 +320,7 @@
# TODO: run only stages, which have config
failed = False
cleanup_stages = []
+
for stage in stages:
if stage.config_block is not None:
if stage.config_block not in ctx.config:
@@ -304,6 +333,8 @@
except (Exception, KeyboardInterrupt):
failed = True
break
+ ctx.storage.sync()
+ ctx.storage.sync()
logger.debug("Start cleanup")
cleanup_failed = False
@@ -313,12 +344,15 @@
stage.cleanup(ctx)
except:
cleanup_failed = True
+ ctx.storage.sync()
if not failed:
for report_stage in report_stages:
with log_stage(report_stage):
report_stage.run(ctx)
+ ctx.storage.sync()
+
logger.info("All info is stored into %r", config.storage_url)
if failed or cleanup_failed:
diff --git a/wally/node.py b/wally/node.py
index 6624bdc..e85ded0 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -163,9 +163,11 @@
def __repr__(self) -> str:
return str(self)
- def get_file_content(self, path: str) -> bytes:
+ def get_file_content(self, path: str, expanduser: bool = False) -> bytes:
logger.debug("GET %s from %s", path, self.info)
- res = self.conn.fs.get_file(self.conn.fs.expanduser(path))
+ if expanduser:
+ path = self.conn.fs.expanduser(path)
+ res = self.conn.fs.get_file(path)
logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
return res
@@ -190,14 +192,21 @@
return out
- def copy_file(self, local_path: str, remote_path: str = None) -> str:
+ def copy_file(self, local_path: str, remote_path: str = None, expanduser: bool = False) -> str:
+ if expanduser:
+ remote_path = self.conn.fs.expanduser(remote_path)
+
data = open(local_path, 'rb').read()
return self.put_to_file(remote_path, data)
- def put_to_file(self, path: Optional[str], content: bytes) -> str:
+ def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
+ if expanduser:
+ path = self.conn.fs.expanduser(path)
return self.conn.fs.store_file(path, content)
- def stat_file(self, path: str) -> Dict[str, int]:
+ def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
+ if expanduser:
+ path = self.conn.fs.expanduser(path)
return self.conn.fs.file_stat(path)
def __exit__(self, x, y, z) -> bool:
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index caca8cc..d0fabac 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,20 +1,25 @@
import abc
-from typing import Any, Set, Optional, Dict, NamedTuple, Optional
+from typing import Any, Set, Dict, NamedTuple, Optional
from .ssh_utils import ConnCreds
from .common_types import IPAddr
+from .result_classes import IStorable
RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
-class NodeInfo:
+class NodeInfo(IStorable):
"""Node information object, result of discovery process or config parsing"""
- def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
+ yaml_tag = 'node_info'
+
+ def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
# ssh credentials
self.ssh_creds = ssh_creds
+
# credentials for RPC connection
self.rpc_creds = None # type: Optional[RPCCreds]
+
self.roles = roles
self.os_vm_id = None # type: Optional[int]
self.params = {} # type: Dict[str, Any]
@@ -30,6 +35,28 @@
def __repr__(self) -> str:
return str(self)
+ def raw(self) -> Dict[str, Any]:
+ dct = self.__dict__.copy()
+
+ if self.rpc_creds is not None:
+ dct['rpc_creds'] = list(self.rpc_creds)
+
+ dct['ssh_creds'] = self.ssh_creds.raw()
+ dct['roles'] = list(self.roles)
+ return dct
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'NodeInfo':
+ data = data.copy()
+ if data['rpc_creds'] is not None:
+ data['rpc_creds'] = RPCCreds(*data['rpc_creds'])
+
+ data['ssh_creds'] = ConnCreds.fromraw(data['ssh_creds'])
+ data['roles'] = set(data['roles'])
+ obj = cls.__new__(cls)
+ obj.__dict__.update(data)
+ return obj
+
class ISSHHost(metaclass=abc.ABCMeta):
"""Minimal interface, required to setup RPC connection"""
diff --git a/wally/openstack.py b/wally/openstack.py
index 3b5ced5..264cfbb 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -36,10 +36,10 @@
def get_OS_credentials(ctx: TestRun) -> OSCreds:
if "openstack_openrc" in ctx.storage:
- return ctx.storage.load(OSCreds, "openstack_openrc")
+ return OSCreds(*cast(List, ctx.storage.get("openstack_openrc")))
- creds = None
- os_creds = None
+ creds = None # type: OSCreds
+ os_creds = None # type: OSCreds
force_insecure = False
cfg = ctx.config
@@ -80,7 +80,7 @@
logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
"auth_url={0.auth_url} insecure={0.insecure}").format(creds))
- ctx.storage["openstack_openrc"] = creds # type: ignore
+ ctx.storage.put(list(creds), "openstack_openrc")
return creds
@@ -169,7 +169,7 @@
def run(self, ctx: TestRun) -> None:
if 'all_nodes' in ctx.storage:
- ctx.os_spawned_nodes_ids = ctx.storage['os_spawned_nodes_ids'] # type: ignore
+ ctx.os_spawned_nodes_ids = ctx.storage.get('os_spawned_nodes_ids')
logger.info("Skipping OS VMS discovery/spawn as all data found in storage")
return
@@ -205,7 +205,7 @@
ctx.nodes_info[nid] = info
ctx.os_spawned_nodes_ids.append(info.os_vm_id)
- ctx.storage['os_spawned_nodes_ids'] = ctx.os_spawned_nodes_ids # type: ignore
+ ctx.storage.put(ctx.os_spawned_nodes_ids, 'os_spawned_nodes_ids')
def cleanup(self, ctx: TestRun) -> None:
# keep nodes in case of error for future test restart
@@ -213,7 +213,7 @@
logger.info("Removing nodes")
clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
- del ctx.storage['spawned_os_nodes']
+ ctx.storage.rm('spawned_os_nodes')
logger.info("OS spawned nodes has been successfully removed")
diff --git a/wally/process_results.py b/wally/process_results.py
new file mode 100644
index 0000000..f01b14e
--- /dev/null
+++ b/wally/process_results.py
@@ -0,0 +1,2 @@
+# put all result preprocessing here
+# selection, aggregation
diff --git a/wally/report.py b/wally/report.py
index d2bef7e..0c96280 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,11 +1,12 @@
import os
import csv
+import abc
import bisect
import logging
import itertools
import collections
from io import StringIO
-from typing import Dict, Any, Iterator, Tuple, cast
+from typing import Dict, Any, Iterator, Tuple, cast, List
try:
import numpy
@@ -18,7 +19,6 @@
import wally
from .utils import ssize2b
-from .statistic import round_3_digit
from .storage import Storage
from .stage import Stage, StepOrder
from .test_run_class import TestRun
@@ -33,30 +33,31 @@
def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
- sensors_data = {} # type: Dict[Tuple[str, str, str], SensorInfo]
-
- mstorage = storage.sub_storage("metric")
- for _, node_id in mstorage.list():
- for _, dev_name in mstorage.list(node_id):
- for _, sensor_name in mstorage.list(node_id, dev_name):
- key = (node_id, dev_name, sensor_name)
- si = SensorInfo(*key)
- si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name] # type: ignore
- sensors_data[key] = si
-
- rstorage = storage.sub_storage("result")
- for _, run_id in rstorage.list():
- ftr = FullTestResult()
- ftr.test_info = rstorage.load(TestInfo, run_id, "info")
- ftr.performance_data = {}
-
- p1 = "{}/measurement".format(run_id)
- for _, node_id in rstorage.list(p1):
- for _, measurement_name in rstorage.list(p1, node_id):
- perf_key = (node_id, measurement_name)
- ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
-
- yield ftr
+ raise NotImplementedError()
+ # sensors_data = {} # type: Dict[Tuple[str, str, str], SensorInfo]
+ #
+ # mstorage = storage.sub_storage("metric")
+ # for _, node_id in mstorage.list():
+ # for _, dev_name in mstorage.list(node_id):
+ # for _, sensor_name in mstorage.list(node_id, dev_name):
+ # key = (node_id, dev_name, sensor_name)
+ # si = SensorInfo(*key)
+ # si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name] # type: ignore
+ # sensors_data[key] = si
+ #
+ # rstorage = storage.sub_storage("result")
+ # for _, run_id in rstorage.list():
+ # ftr = FullTestResult()
+ # ftr.test_info = rstorage.load(TestInfo, run_id, "info")
+ # ftr.performance_data = {}
+ #
+ # p1 = "{}/measurement".format(run_id)
+ # for _, node_id in rstorage.list(p1):
+ # for _, measurement_name in rstorage.list(p1, node_id):
+ # perf_key = (node_id, measurement_name)
+ # ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
+ #
+ # yield ftr
class ConsoleReportStage(Stage):
@@ -67,6 +68,7 @@
# TODO(koder): load data from storage
raise NotImplementedError("...")
+
class HtmlReportStage(Stage):
priority = StepOrder.REPORT
@@ -75,26 +77,86 @@
# TODO(koder): load data from storage
raise NotImplementedError("...")
-# class StoragePerfInfo:
-# def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
-# self.direct_iops_r_max = 0 # type: int
-# self.direct_iops_w_max = 0 # type: int
-#
-# # 64 used instead of 4k to faster feed caches
-# self.direct_iops_w64_max = 0 # type: int
-#
-# self.rws4k_10ms = 0 # type: int
-# self.rws4k_30ms = 0 # type: int
-# self.rws4k_100ms = 0 # type: int
-# self.bw_write_max = 0 # type: int
-# self.bw_read_max = 0 # type: int
-#
-# self.bw = None #
-# self.iops = None
-# self.lat = None
-# self.lat_50 = None
-# self.lat_95 = None
-#
+
+# TODO: need to be revised, have to user StatProps fields instead
+class StoragePerfSummary:
+ def __init__(self, name: str) -> None:
+ self.direct_iops_r_max = 0 # type: int
+ self.direct_iops_w_max = 0 # type: int
+
+ # 64 used instead of 4k to faster feed caches
+ self.direct_iops_w64_max = 0 # type: int
+
+ self.rws4k_10ms = 0 # type: int
+ self.rws4k_30ms = 0 # type: int
+ self.rws4k_100ms = 0 # type: int
+ self.bw_write_max = 0 # type: int
+ self.bw_read_max = 0 # type: int
+
+ self.bw = None # type: float
+ self.iops = None # type: float
+ self.lat = None # type: float
+ self.lat_50 = None # type: float
+ self.lat_95 = None # type: float
+
+
+class HTMLBlock:
+ data = None # type: str
+ js_links = [] # type: List[str]
+ css_links = [] # type: List[str]
+
+
+class Reporter(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ pass
+
+
+# Main performance report
+class PerformanceSummary(Reporter):
+ """Creates graph, which show how IOPS and Latency depend on QD"""
+
+
+# Main performance report
+class IOPS_QD(Reporter):
+ """Creates graph, which show how IOPS and Latency depend on QD"""
+
+
+# Linearization report
+class IOPS_Bsize(Reporter):
+ """Creates graphs, which show how IOPS and Latency depend on block size"""
+
+
+# IOPS/latency distribution
+class IOPSHist(Reporter):
+ """IOPS.latency distribution histogram"""
+
+
+# IOPS/latency over test time
+class IOPSTime(Reporter):
+ """IOPS/latency during test"""
+
+
+# Cluster load over test time
+class ClusterLoad(Reporter):
+ """IOPS/latency during test"""
+
+
+# Node load over test time
+class NodeLoad(Reporter):
+ """IOPS/latency during test"""
+
+
+# Ceph cluster summary
+class CephClusterSummary(Reporter):
+ """IOPS/latency during test"""
+
+
+# TODO: Resource consumption report
+# TODO: Ceph operation breakout report
+# TODO: Resource consumption for different type of test
+
+
#
# # disk_info = None
# # base = None
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 19925f2..7244225 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,6 @@
+import abc
import array
-from typing import Dict, List, Any, Tuple, Optional
+from typing import Dict, List, Any, Tuple, Optional, Union, Type
class TimeSerie:
@@ -81,3 +82,20 @@
# {(node_id, perf_metrics_name): values}
sensors_data = None # type: Dict[Tuple[str, str, str], SensorInfo]
+
+
+class IStorable(metaclass=abc.ABCMeta):
+ """Interface for type, which can be stored"""
+
+ @abc.abstractmethod
+ def raw(self) -> Dict[str, Any]:
+ pass
+
+ @abc.abstractclassmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+ pass
+
+
+Basic = Union[int, str, bytes, bool, None]
+Storable = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
+
diff --git a/wally/run_test.py b/wally/run_test.py
index 52803d1..8e8a4e9 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -7,14 +7,17 @@
from . import utils, ssh_utils, hw_info
from .config import ConfigBlock
from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode, ISSHHost
+from .node_interfaces import NodeInfo, IRPCNode
from .stage import Stage, StepOrder
+from .sensors import collect_sensors_data
from .suits.io.fio import IOPerfTest
from .suits.itest import TestInputConfig
from .suits.mysql import MysqlTest
from .suits.omgbench import OmgTest
from .suits.postgres import PgBenchTest
from .test_run_class import TestRun
+from .statistic import calc_stat_props
+from .utils import StopTestError
TOOL_TYPE_MAPPER = {
@@ -83,7 +86,11 @@
path = "rpc_logs/" + nid
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
- ctx.storage.store_raw(log, path)
+ if path in ctx.storage:
+ previous = ctx.storage.get_raw(path)
+ else:
+ previous = b""
+ ctx.storage.put_raw(previous + log, path)
logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
with ctx.get_pool() as pool:
@@ -112,7 +119,7 @@
for (path, nid), future in futures.items():
try:
- ctx.storage[path] = future.result()
+ ctx.storage.put(future.result(), path)
except Exception:
logger.exception("During collecting hardware info from %s", nid)
raise utils.StopTestError()
@@ -126,7 +133,7 @@
for (path, nid), future in futures.items():
try:
- ctx.storage[path] = future.result()
+ ctx.storage.put(future.result(), path)
except Exception:
logger.exception("During collecting software info from %s", nid)
raise utils.StopTestError()
@@ -154,7 +161,7 @@
priority = StepOrder.CONNECT
def run(self, ctx: TestRun) -> None:
- ctx.storage['all_nodes'] = list(ctx.nodes_info.values()) # type: ignore
+ ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
class SleepStage(Stage):
@@ -226,7 +233,7 @@
if not test_nodes:
logger.error("No test nodes found")
- return
+ raise StopTestError()
for name, params in test_group.items():
vm_count = params.get('node_limit', None) # type: Optional[int]
@@ -250,8 +257,46 @@
storage=ctx.storage,
remote_dir=remote_dir)
- test_cls(test_cfg).run()
+ test_cls(test_cfg,
+ on_idle=lambda: collect_sensors_data(ctx, False)).run()
@classmethod
def validate_config(cls, cfg: ConfigBlock) -> None:
pass
+
+
+class CalcStatisticStage(Stage):
+ priority = StepOrder.TEST + 1
+
+ def run(self, ctx: TestRun) -> None:
+ results = {}
+ for name, summary, stor_path in ctx.storage.get("all_results"):
+ if name == 'fio':
+ test_info = ctx.storage.get(stor_path, "info")
+ for node in test_info['nodes']:
+ iops = ctx.storage.get_array(stor_path, node, 'iops_data')
+ bw = ctx.storage.get_array(stor_path, node, 'bw_data')
+ lat = ctx.storage.get_array(stor_path, node, 'lat_data')
+ results[summary] = (iops, bw, lat)
+
+ for name, (iops, bw, lat) in results.items():
+ print(" ------------------- IOPS -------------------")
+ print(calc_stat_props(iops)) # type: ignore
+ print(" ------------------- BW -------------------")
+ print(calc_stat_props(bw)) # type: ignore
+ # print(" ------------------- LAT -------------------")
+ # print(calc_stat_props(lat))
+
+
+class LoadStoredNodesStage(Stage):
+ priority = StepOrder.DISCOVER
+
+ def run(self, ctx: TestRun) -> None:
+ if 'all_nodes' in ctx.storage:
+ if ctx.nodes_info:
+ logger.error("Internal error: Some nodes already stored in " +
+ "nodes_info before LoadStoredNodesStage stage")
+ raise StopTestError()
+ ctx.nodes_info = {node.node_id(): node
+ for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
+ logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index 55d8b84..be5e5db 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -3,6 +3,7 @@
import json
import time
import array
+import pprint
import logging
import threading
import traceback
@@ -38,6 +39,12 @@
def unpack_results(cls, device, metrics, data):
pass
+ def init(self):
+ pass
+
+ def stop(self):
+ pass
+
class ArraysSensor(Sensor):
typecode = 'L'
@@ -384,16 +391,21 @@
admin_socket = None
-def run_ceph_daemon_cmd(cluster, osd_id, *args):
- asok = "/var/run/ceph/{}-osd.{}.asok".format(cluster, osd_id)
- if admin_socket:
- return admin_socket(asok, args)
- else:
- return subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
-
-
@provides("ceph")
class CephSensor(ArraysSensor):
+
+ historic_duration = 2
+ historic_size = 200
+
+ def run_ceph_daemon_cmd(self, osd_id, *args):
+ asok = "/var/run/ceph/{}-osd.{}.asok".format(self.cluster, osd_id)
+ if admin_socket:
+ res = admin_socket(asok, args)
+ else:
+ res = subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
+
+ return res
+
def collect(self):
def get_val(dct, path):
if '/' in path:
@@ -402,10 +414,64 @@
return dct[path]
for osd_id in self.params['osds']:
- data = json.loads(run_ceph_daemon_cmd(self.params['cluster'], osd_id, 'perf', 'dump'))
+ data = json.loads(self.run_ceph_daemon_cmd(osd_id, 'perf', 'dump'))
for key_name in self.params['counters']:
self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
+ if 'historic' in self.params.get('sources', {}):
+ self.historic.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
+
+ if 'in_flight' in self.params.get('sources', {}):
+ self.in_flight.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_ops_in_flight"))
+
+ def set_osd_historic(self, duration, keep, osd_id):
+ data = json.loads(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
+ self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_duration {}".format(duration))
+ self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_size {}".format(keep))
+ return (data["duration to keep"], data["num to keep"])
+
+ def init(self):
+ self.cluster = self.params['cluster']
+ self.prev_vals = {}
+ self.historic = {}
+ self.in_flight = {}
+
+ if 'historic' in self.params.get('sources', {}):
+ for osd_id in self.params['osds']:
+ self.prev_vals[osd_id] = self.set_osd_historic(self.historic_duration, self.historic_size, osd_id)
+
+ def stop(self):
+ for osd_id, (duration, keep) in self.prev_vals.items():
+ self.prev_vals[osd_id] = self.set_osd_historic(duration, keep, osd_id)
+
+ def get_updates(self):
+ res = super().get_updates()
+
+ for osd_id, data in self.historic.items():
+ res[("osd{}".format(osd_id), "historic")] = data
+
+ self.historic = {}
+
+ for osd_id, data in self.in_flight.items():
+ res[("osd{}".format(osd_id), "in_flight")] = data
+
+ self.in_flight = {}
+
+ return res
+
+ @classmethod
+ def unpack_results(cls, device, metrics, packed):
+ if metrics in ('historic', 'in_flight'):
+ return packed
+
+ arr = array.array(chr(packed[0]))
+ if sys.version_info >= (3, 0, 0):
+ arr.frombytes(packed[1:])
+ else:
+ arr.fromstring(packed[1:])
+
+ return arr
+
class SensorsData(object):
def __init__(self):
@@ -432,7 +498,6 @@
return curr
-# TODO(koder): a lot code here can be optimized and cached, but nobody cares (c)
def sensors_bg_thread(sensors_config, sdata):
try:
next_collect_at = time.time()
@@ -450,8 +515,8 @@
params["disallowed_prefixes"] = config["disallow"]
sdata.sensors[name] = SensorsMap[name](**params)
+ sdata.sensors[name].init()
- import pprint
logger.debug("sensors.config = %s", pprint.pformat(sensors_config))
logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys()))
@@ -483,6 +548,10 @@
except Exception:
logger.exception("In sensor BG thread")
sdata.exception = traceback.format_exc()
+ finally:
+ for sensor in sdata.sensors.values():
+ sensor.stop()
+ sdata.sensors = None
sensors_thread = None
@@ -539,6 +608,7 @@
collected_at = sdata.collected_at
sdata.collected_at = array.array(sdata.collected_at.typecode)
+ # TODO: pack data before send
return res, collected_at.tostring()
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index c224bf4..e9e118d 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,5 +1,4 @@
import re
-import yaml
import getpass
import logging
from typing import List, Dict, Any
@@ -7,6 +6,7 @@
from . import utils
from .common_types import IPAddr
+from .result_classes import IStorable
logger = logging.getLogger("wally")
@@ -47,9 +47,7 @@
uri_reg_exprs.append(templ.format(**re_dct))
-class ConnCreds(yaml.YAMLObject): # type: ignore
- yaml_tag = '!ConnCreds'
-
+class ConnCreds(IStorable):
def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
key_file: str = None, key: bytes = None) -> None:
self.user = user
@@ -64,21 +62,18 @@
def __repr__(self) -> str:
return str(self)
- @classmethod
- def to_yaml(cls, dumper: Any, data: 'ConnCreds') -> Any:
- dict_representation = {
- 'user': data.user,
- 'host': data.addr.host,
- 'port': data.addr.port,
- 'passwd': data.passwd,
- 'key_file': data.key_file
+ def raw(self) -> Dict[str, Any]:
+ return {
+ 'user': self.user,
+ 'host': self.addr.host,
+ 'port': self.addr.port,
+ 'passwd': self.passwd,
+ 'key_file': self.key_file
}
- return dumper.represent_mapping(data.yaml_tag, dict_representation)
@classmethod
- def from_yaml(cls, loader: Any, node: Any) -> 'ConnCreds':
- dct = loader.construct_mapping(node)
- return cls(**dct)
+ def fromraw(cls, data) -> 'ConnCreds':
+ return cls(**data)
def parse_ssh_uri(uri: str) -> ConnCreds:
diff --git a/wally/statistic.py b/wally/statistic.py
index e2021e1..2263788 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,48 +1,119 @@
import math
import itertools
+import statistics
+from typing import Union, List, TypeVar, Callable, Iterable, Tuple, Any, cast, Dict
-try:
- from scipy import stats
- from numpy import array, linalg
- from scipy.optimize import leastsq
- from numpy.polynomial.chebyshev import chebfit, chebval
- no_numpy = False
-except ImportError:
- no_numpy = True
+import numpy
+from scipy import stats, optimize
+from numpy import linalg
+from numpy.polynomial.chebyshev import chebfit, chebval
-def average(data):
- return sum(data) / len(data)
+from .result_classes import IStorable
-def med_dev(vals):
- if len(vals) == 1:
- return vals[0], 0.0
-
- med = sum(vals) / len(vals)
- dev = ((sum(abs(med - i) ** 2.0 for i in vals) / (len(vals) - 1)) ** 0.5)
- return med, dev
+Number = Union[int, float]
+TNumber = TypeVar('TNumber', int, float)
-def round_3_digit(val):
- return round_deviation((val, val / 10.0))[0]
+DOUBLE_DELTA = 1e-8
-def round_deviation(med_dev):
- med, dev = med_dev
-
- if dev < 1E-7:
- return med_dev
-
- dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
- dev = int(dev / dev_div) * dev_div
- med = int(med / dev_div) * dev_div
- return [type(med_dev[0])(med),
- type(med_dev[1])(dev)]
+average = statistics.mean
+dev = statistics.variance
-def groupby_globally(data, key_func):
- grouped = {}
+class StatProps(IStorable):
+ "Statistic properties for timeserie"
+
+ yaml_tag = 'stat'
+
+ def __init__(self, data: List[Number]) -> None:
+ self.average = None # type: float
+ self.deviation = None # type: float
+ self.confidence = None # type: float
+ self.confidence_level = None # type: float
+
+ self.perc_99 = None # type: float
+ self.perc_95 = None # type: float
+ self.perc_90 = None # type: float
+ self.perc_50 = None # type: float
+
+ self.min = None # type: Number
+ self.max = None # type: Number
+
+ # bin_center: bin_count
+ self.histo = None # type: Tuple[List[int], List[float]]
+ self.data = data
+
+ self.normtest = None # type: Any
+
+ def __str__(self) -> str:
+ res = ["StatProps(num_el={}):".format(len(self.data)),
+ " distr = {0.average} ~ {0.deviation}".format(self),
+ " confidence({0.confidence_level}) = {0.confidence}".format(self),
+ " perc50={0.perc50}".format(self),
+ " perc90={0.perc90}".format(self),
+ " perc95={0.perc95}".format(self),
+ " perc95={0.perc99}".format(self),
+ " range {0.min} {0.max}".format(self),
+ " nurmtest = {0.nortest}".format(self)]
+ return "\n".join(res)
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def raw(self) -> Dict[str, Any]:
+ return self.__dict__.copy()
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
+ res = cls.__new__(cls)
+ res.__dict__.update(data)
+ return res
+
+
+def greater_digit_pos(val: Number) -> int:
+ return int(math.floor(math.log10(val))) + 1
+
+
+def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
+ pow = 10 ** (greater_digit_pos(val) - num_digits)
+ return type(val)(int(val / pow) * pow)
+
+
+def calc_stat_props(data: List[Number], confidence: float = 0.95) -> StatProps:
+ "Calculate statistical properties of array of numbers"
+
+ res = StatProps(data)
+
+ if len(data) == 0:
+ raise ValueError("Input array is empty")
+
+ data = sorted(data)
+ res.average = average(data)
+ res.deviation = dev(data)
+ res.max = data[-1]
+ res.min = data[0]
+
+ res.perc_50 = numpy.percentile(data, 50)
+ res.perc_90 = numpy.percentile(data, 90)
+ res.perc_95 = numpy.percentile(data, 95)
+ res.perc_99 = numpy.percentile(data, 99)
+
+ if len(data) >= 3:
+ res.confidence = stats.sem(data) * \
+ stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+ else:
+ res.confidence = None
+
+ res.histo = numpy.histogram(data, 'auto')
+ res.normtest = stats.mstats.normaltest(data)
+ return res
+
+
+def groupby_globally(data: Iterable, key_func: Callable):
+ grouped = {} # type: ignore
grouped_iter = itertools.groupby(data, key_func)
for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
@@ -52,25 +123,19 @@
return grouped
-def approximate_curve(x, y, xnew, curved_coef):
+def approximate_curve(x: List[Number], y: List[float], xnew: List[Number], curved_coef: int) -> List[float]:
"""returns ynew - y values of some curve approximation"""
- if no_numpy:
- return None
-
- return chebval(xnew, chebfit(x, y, curved_coef))
+ return cast(List[float], chebval(xnew, chebfit(x, y, curved_coef)))
-def approximate_line(x, y, xnew, relative_dist=False):
- """ x, y - test data, xnew - dots, where we want find approximation
- if not relative_dist distance = y - newy
- returns ynew - y values of linear approximation"""
-
- if no_numpy:
- return None
-
- # convert to numpy.array (don't work without it)
- ox = array(x)
- oy = array(y)
+def approximate_line(x: List[Number], y: List[float], xnew: List[Number], relative_dist: bool = False) -> List[float]:
+ """
+ x, y - test data, xnew - dots, where we want find approximation
+ if not relative_dist distance = y - newy
+ returns ynew - y values of linear approximation
+ """
+ ox = numpy.array(x)
+ oy = numpy.array(y)
# set approximation function
def func_line(tpl, x):
@@ -89,108 +154,41 @@
oy[:2]))
# find line
- tpl_final, success = leastsq(error_func,
- tpl_initial[:],
- args=(ox, oy))
+ tpl_final, success = optimize.leastsq(error_func, tpl_initial[:], args=(ox, oy))
# if error
if success not in range(1, 5):
raise ValueError("No line for this dots")
# return new dots
- return func_line(tpl_final, array(xnew))
+ return func_line(tpl_final, numpy.array(xnew))
-def difference(y, ynew):
- """returns average and maximum relative and
- absolute differences between y and ynew
- result may contain None values for y = 0
- return value - tuple:
- [(abs dif, rel dif) * len(y)],
- (abs average, abs max),
- (rel average, rel max)"""
-
- abs_dlist = []
- rel_dlist = []
-
- for y1, y2 in zip(y, ynew):
- # absolute
- abs_dlist.append(y1 - y2)
-
- if y1 > 1E-6:
- rel_dlist.append(abs(abs_dlist[-1] / y1))
- else:
- raise ZeroDivisionError("{0!r} is too small".format(y1))
-
- da_avg = sum(abs_dlist) / len(abs_dlist)
- dr_avg = sum(rel_dlist) / len(rel_dlist)
-
- return (zip(abs_dlist, rel_dlist),
- (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
- )
-
-
-def calculate_distribution_properties(data):
- """chi, etc"""
-
-
-def minimal_measurement_count(data, max_diff, req_probability):
- """
- should returns amount of measurements to get results (avg and deviation)
- with error less, that max_diff in at least req_probability% cases
- """
-
-
-class StatProps(object):
- def __init__(self):
- self.average = None
- self.mediana = None
- self.perc_95 = None
- self.perc_5 = None
- self.deviation = None
- self.confidence = None
- self.min = None
- self.max = None
- self.raw = None
-
- def rounded_average_conf(self):
- return round_deviation((self.average, self.confidence))
-
- def rounded_average_dev(self):
- return round_deviation((self.average, self.deviation))
-
- def __str__(self):
- return "StatProps({0} ~ {1})".format(round_3_digit(self.average),
- round_3_digit(self.deviation))
-
- def __repr__(self):
- return str(self)
-
-
-def data_property(data, confidence=0.95):
- res = StatProps()
- if len(data) == 0:
- return res
-
- data = sorted(data)
- res.average, res.deviation = med_dev(data)
- res.max = data[-1]
- res.min = data[0]
-
- ln = len(data)
- if ln % 2 == 0:
- res.mediana = (data[ln / 2] + data[ln / 2 - 1]) / 2
- else:
- res.mediana = data[ln / 2]
-
- res.perc_95 = data[int((ln - 1) * 0.95)]
- res.perc_5 = data[int((ln - 1) * 0.05)]
-
- if not no_numpy and ln >= 3:
- res.confidence = stats.sem(data) * \
- stats.t.ppf((1 + confidence) / 2, ln - 1)
- else:
- res.confidence = res.deviation
-
- res.raw = data[:]
- return res
+# TODO: revise next
+# def difference(y, ynew):
+# """returns average and maximum relative and
+# absolute differences between y and ynew
+# result may contain None values for y = 0
+# return value - tuple:
+# [(abs dif, rel dif) * len(y)],
+# (abs average, abs max),
+# (rel average, rel max)"""
+#
+# abs_dlist = []
+# rel_dlist = []
+#
+# for y1, y2 in zip(y, ynew):
+# # absolute
+# abs_dlist.append(y1 - y2)
+#
+# if y1 > 1E-6:
+# rel_dlist.append(abs(abs_dlist[-1] / y1))
+# else:
+# raise ZeroDivisionError("{0!r} is too small".format(y1))
+#
+# da_avg = sum(abs_dlist) / len(abs_dlist)
+# dr_avg = sum(rel_dlist) / len(rel_dlist)
+#
+# return (zip(abs_dlist, rel_dlist),
+# (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
+# )
diff --git a/wally/storage.py b/wally/storage.py
index 6879dcf..2c0a26b 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -6,8 +6,8 @@
import abc
import array
import shutil
-from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
-
+import sqlite3
+from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable
import yaml
try:
@@ -16,16 +16,7 @@
from yaml import Loader, Dumper # type: ignore
-class IStorable(metaclass=abc.ABCMeta):
- """Interface for type, which can be stored"""
-
-basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
-for btype in basic_types:
- # pylint: disable=E1101
- IStorable.register(btype) # type: ignore
-
-
-ObjClass = TypeVar('ObjClass')
+from .result_classes import Storable, IStorable
class ISimpleStorage(metaclass=abc.ABCMeta):
@@ -33,15 +24,19 @@
and can operate only on bytes"""
@abc.abstractmethod
- def __setitem__(self, path: str, value: bytes) -> None:
+ def put(self, value: bytes, path: str) -> None:
pass
@abc.abstractmethod
- def __getitem__(self, path: str) -> bytes:
+ def get(self, path: str) -> bytes:
pass
@abc.abstractmethod
- def __delitem__(self, path: str) -> None:
+ def rm(self, path: str) -> None:
+ pass
+
+ @abc.abstractmethod
+ def sync(self) -> None:
pass
@abc.abstractmethod
@@ -49,75 +44,153 @@
pass
@abc.abstractmethod
- def list(self, path: str) -> Iterator[Tuple[bool, str]]:
- pass
-
- @abc.abstractmethod
- def get_stream(self, path: str, mode: str = "rb+") -> IO:
+ def get_fd(self, path: str, mode: str = "rb+") -> IO:
pass
@abc.abstractmethod
def sub_storage(self, path: str) -> 'ISimpleStorage':
pass
- @abc.abstractmethod
- def clear(self, path: str) -> None:
- pass
-
class ISerializer(metaclass=abc.ABCMeta):
"""Interface for serialization class"""
@abc.abstractmethod
- def pack(self, value: IStorable) -> bytes:
+ def pack(self, value: Storable) -> bytes:
pass
@abc.abstractmethod
- def unpack(self, data: bytes) -> IStorable:
+ def unpack(self, data: bytes) -> Any:
pass
+class DBStorage(ISimpleStorage):
+
+ create_tb_sql = "CREATE TABLE IF NOT EXISTS wally_storage (key text, data blob, type text)"
+ insert_sql = "INSERT INTO wally_storage VALUES (?, ?, ?)"
+ update_sql = "UPDATE wally_storage SET data=?, type=? WHERE key=?"
+ select_sql = "SELECT data, type FROM wally_storage WHERE key=?"
+ contains_sql = "SELECT 1 FROM wally_storage WHERE key=?"
+ rm_sql = "DELETE FROM wally_storage WHERE key LIKE '{}%'"
+ list2_sql = "SELECT key, length(data), type FROM wally_storage"
+
+ def __init__(self, db_path: str = None, existing: bool = False,
+ prefix: str = None, db: sqlite3.Connection = None) -> None:
+
+ assert not prefix or "'" not in prefix, "Broken sql prefix {!r}".format(prefix)
+
+ if db_path:
+ self.existing = existing
+ if existing:
+ if not os.path.isfile(db_path):
+ raise IOError("No storage found at {!r}".format(db_path))
+
+ os.makedirs(os.path.dirname(db_path), exist_ok=True)
+ try:
+ self.db = sqlite3.connect(db_path)
+ except sqlite3.OperationalError as exc:
+ raise IOError("Can't open database at {!r}".format(db_path)) from exc
+
+ self.db.execute(self.create_tb_sql)
+ else:
+ if db is None:
+ raise ValueError("Either db or db_path parameter must be passed")
+ self.db = db
+
+ if prefix is None:
+ self.prefix = ""
+ elif not prefix.endswith('/'):
+ self.prefix = prefix + '/'
+ else:
+ self.prefix = prefix
+
+ def put(self, value: bytes, path: str) -> None:
+ c = self.db.cursor()
+ fpath = self.prefix + path
+ c.execute(self.contains_sql, (fpath,))
+ if len(c.fetchall()) == 0:
+ c.execute(self.insert_sql, (fpath, value, 'yaml'))
+ else:
+ c.execute(self.update_sql, (value, 'yaml', fpath))
+
+ def get(self, path: str) -> bytes:
+ c = self.db.cursor()
+ c.execute(self.select_sql, (self.prefix + path,))
+ res = cast(List[Tuple[bytes, str]], c.fetchall()) # type: List[Tuple[bytes, str]]
+ if not res:
+ raise KeyError(path)
+ assert len(res) == 1
+ val, tp = res[0]
+ assert tp == 'yaml'
+ return val
+
+ def rm(self, path: str) -> None:
+ c = self.db.cursor()
+ path = self.prefix + path
+ assert "'" not in path, "Broken sql path {!r}".format(path)
+ c.execute(self.rm_sql.format(path))
+
+ def __contains__(self, path: str) -> bool:
+ c = self.db.cursor()
+ path = self.prefix + path
+ c.execute(self.contains_sql, (self.prefix + path,))
+ return len(c.fetchall()) != 0
+
+ def print_tree(self):
+ c = self.db.cursor()
+ c.execute(self.list2_sql)
+ data = list(c.fetchall())
+ data.sort()
+ print("------------------ DB ---------------------")
+ for key, data_ln, type in data:
+ print(key, data_ln, type)
+ print("------------------ END --------------------")
+
+ def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
+ raise NotImplementedError("SQLITE3 doesn't provide fd-like interface")
+
+ def sub_storage(self, path: str) -> 'DBStorage':
+ return self.__class__(prefix=self.prefix + path, db=self.db)
+
+ def sync(self):
+ self.db.commit()
+
+
+DB_REL_PATH = "__db__.db"
+
+
class FSStorage(ISimpleStorage):
"""Store all data in files on FS"""
def __init__(self, root_path: str, existing: bool) -> None:
self.root_path = root_path
self.existing = existing
- if existing:
- if not os.path.isdir(self.root_path):
- raise IOError("No storage found at {!r}".format(root_path))
def j(self, path: str) -> str:
return os.path.join(self.root_path, path)
- def __setitem__(self, path: str, value: bytes) -> None:
+ def put(self, value: bytes, path: str) -> None:
jpath = self.j(path)
os.makedirs(os.path.dirname(jpath), exist_ok=True)
with open(jpath, "wb") as fd:
fd.write(value)
- def __delitem__(self, path: str) -> None:
+ def get(self, path: str) -> bytes:
try:
- os.unlink(path)
- except FileNotFoundError:
- pass
+ with open(self.j(path), "rb") as fd:
+ return fd.read()
+ except FileNotFoundError as exc:
+ raise KeyError(path) from exc
- def __getitem__(self, path: str) -> bytes:
- with open(self.j(path), "rb") as fd:
- return fd.read()
+ def rm(self, path: str) -> None:
+ if os.path.isdir(path):
+ shutil.rmtree(path, ignore_errors=True)
+ elif os.path.exists(path):
+ os.unlink(path)
def __contains__(self, path: str) -> bool:
return os.path.exists(self.j(path))
- def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
- jpath = self.j(path)
- if not os.path.exists(jpath):
- return
-
- for entry in os.scandir(jpath):
- if not entry.name in ('..', '.'):
- yield entry.is_file(), entry.name
-
- def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+ def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
jpath = self.j(path)
if "cb" == mode:
@@ -140,77 +213,89 @@
def sub_storage(self, path: str) -> 'FSStorage':
return self.__class__(self.j(path), self.existing)
- def clear(self, path: str) -> None:
- if os.path.exists(path):
- shutil.rmtree(self.j(path))
+ def sync(self):
+ pass
class YAMLSerializer(ISerializer):
"""Serialize data to yaml"""
- def pack(self, value: Any) -> bytes:
- if type(value) not in basic_types:
- # for name, val in value.__dict__.items():
- # if type(val) not in basic_types:
- # raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
- # " basic types accepted as attributes").format(value, name, val, type(val)))
- value = value.__dict__
- return yaml.dump(value, Dumper=Dumper, encoding="utf8")
+ def pack(self, value: Storable) -> bytes:
+ try:
+ return yaml.dump(value, Dumper=Dumper, encoding="utf8")
+ except Exception as exc:
+ raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
- def unpack(self, data: bytes) -> IStorable:
+ def unpack(self, data: bytes) -> Any:
return yaml.load(data, Loader=Loader)
+class SAFEYAMLSerializer(ISerializer):
+ """Serialize data to yaml"""
+ def pack(self, value: Storable) -> bytes:
+ try:
+ return yaml.safe_dump(value, encoding="utf8")
+ except Exception as exc:
+ raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
+
+ def unpack(self, data: bytes) -> Any:
+ return yaml.safe_load(data)
+
+
+ObjClass = TypeVar('ObjClass', bound=IStorable)
+
+
class Storage:
"""interface for storage"""
- def __init__(self, storage: ISimpleStorage, serializer: ISerializer) -> None:
- self.storage = storage
+ def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
+ self.fs = fs_storage
+ self.db = db_storage
self.serializer = serializer
def sub_storage(self, *path: str) -> 'Storage':
- return self.__class__(self.storage.sub_storage("/".join(path)), self.serializer)
+ fpath = "/".join(path)
+ return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
- def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
- if not isinstance(path, str):
- path = "/".join(path)
+ def put(self, value: Storable, *path: str) -> None:
+ dct_value = value.raw() if isinstance(value, IStorable) else value
+ serialized = self.serializer.pack(dct_value)
+ fpath = "/".join(path)
+ self.db.put(serialized, fpath)
+ self.fs.put(serialized, fpath)
- self.storage[path] = self.serializer.pack(cast(IStorable, value))
+ def put_list(self, value: Iterable[IStorable], *path: str) -> None:
+ serialized = self.serializer.pack([obj.raw() for obj in value])
+ fpath = "/".join(path)
+ self.db.put(serialized, fpath)
+ self.fs.put(serialized, fpath)
- def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
- if not isinstance(path, str):
- path = "/".join(path)
+ def get(self, *path: str) -> Any:
+ return self.serializer.unpack(self.db.get("/".join(path)))
- return self.serializer.unpack(self.storage[path])
+ def rm(self, *path: str) -> None:
+ fpath = "/".join(path)
+ self.fs.rm(fpath)
+ self.db.rm(fpath)
- def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
- if not isinstance(path, str):
- path = "/".join(path)
- del self.storage[path]
+ def __contains__(self, path: str) -> bool:
+ return path in self.fs or path in self.db
- def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
- if not isinstance(path, str):
- path = "/".join(path)
- return path in self.storage
-
- def store_raw(self, val: bytes, *path: str) -> None:
- self.storage["/".join(path)] = val
-
- def clear(self, *path: str) -> None:
- self.storage.clear("/".join(path))
+ def put_raw(self, val: bytes, *path: str) -> None:
+ self.fs.put(val, "/".join(path))
def get_raw(self, *path: str) -> bytes:
- return self.storage["/".join(path)]
+ return self.fs.get("/".join(path))
- def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
- return self.storage.list("/".join(path))
+ def get_fd(self, path: str, mode: str = "r") -> IO:
+ return self.fs.get_fd(path, mode)
- def set_array(self, value: array.array, *path: str) -> None:
- with self.get_stream("/".join(path), "wb") as fd:
+ def put_array(self, value: array.array, *path: str) -> None:
+ with self.get_fd("/".join(path), "wb") as fd:
value.tofile(fd) # type: ignore
def get_array(self, typecode: str, *path: str) -> array.array:
res = array.array(typecode)
path_s = "/".join(path)
- with self.get_stream(path_s, "rb") as fd:
+ with self.get_fd(path_s, "rb") as fd:
fd.seek(0, os.SEEK_END)
size = fd.tell()
fd.seek(0, os.SEEK_SET)
@@ -220,55 +305,33 @@
return res
def append(self, value: array.array, *path: str) -> None:
- with self.get_stream("/".join(path), "cb") as fd:
+ with self.get_fd("/".join(path), "cb") as fd:
fd.seek(0, os.SEEK_END)
value.tofile(fd) # type: ignore
- def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
- "Internal function, used to construct user type from raw unpacked value"
- if obj_class in (int, str, dict, list, None):
- raise ValueError("Can't load into build-in value - {!r} into type {}")
-
- if not isinstance(raw_val, dict):
- raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
-
- if not all(isinstance(key, str) for key in raw_val.keys()):
- raise ValueError("Can't load path {!r} into python type.".format(path) +
- "Raw not all keys in raw value is strings")
-
- obj = obj_class.__new__(obj_class) # type: ObjClass
- obj.__dict__.update(raw_val)
- return obj
-
def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
path_s = "/".join(path)
- raw_val = self[path_s]
+ raw_val = cast(List[Dict[str, Any]], self.get(path_s))
assert isinstance(raw_val, list)
- return [self.construct(path_s, val, obj_class) for val in cast(list, raw_val)]
+ return [obj_class.fromraw(val) for val in raw_val]
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
+ return obj_class.fromraw(self.get(path_s))
- def get_stream(self, path: str, mode: str = "r") -> IO:
- return self.storage.get_stream(path, mode)
-
- def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
- if not isinstance(path, str):
- path = "/".join(path)
-
- try:
- return self[path]
- except Exception:
- return default
+ def sync(self) -> None:
+ self.db.sync()
+ self.fs.sync()
def __enter__(self) -> 'Storage':
return self
def __exit__(self, x: Any, y: Any, z: Any) -> None:
- return
+ self.sync()
def make_storage(url: str, existing: bool = False) -> Storage:
- return Storage(FSStorage(url, existing), YAMLSerializer())
+ return Storage(FSStorage(url, existing),
+ DBStorage(os.path.join(url, DB_REL_PATH)),
+ SAFEYAMLSerializer())
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 307c069..460214c 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,5 +1,6 @@
config: Config - full configuration
all_nodes: List[NodeInfo] - all nodes
+all_results: List[Tuple[str, str, str]] - (test_type, test_summary, result_path)
cli: List[str] - cli options
spawned_nodes_ids: List[int] - list of openstack VM, spawned for test
@@ -14,7 +15,7 @@
# test results
result/{descr}_{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
-result/{descr}_{id}/measurement/{node}/{name}_raw : bytes - raw log
+result/{descr}_{id}/measurement/{node}/{name}_raw : bytes - raw log, where name from {'bw', 'iops', 'lat'}
result/{descr}_{id}/measurement/{node}/{name}_data - List[uint64] - measurements data.
result/{descr}_{id}/measurement/{node}/{name}_meta - Dict[str, Any] - measurements metadata.
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index ee34af4..fb18165 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,7 +1,7 @@
import array
import os.path
import logging
-from typing import cast, Dict
+from typing import cast, Any
import wally
@@ -12,7 +12,7 @@
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
from . import rpc_plugin
from .fio_hist import expected_lat_bins
-from ...storage import Storage
+
logger = logging.getLogger("wally")
@@ -21,6 +21,7 @@
soft_runcycle = 5 * 60
retry_time = 30
configs_dir = os.path.dirname(__file__) # type: str
+ name = 'fio'
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@@ -136,8 +137,9 @@
node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
# TODO: get a link to substorage as a parameter
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
- exec_time = execution_time(cast(FioJobSection, iter_config))
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
+ f_iter_config = cast(FioJobSection, iter_config)
+ exec_time = execution_time(f_iter_config)
fio_cmd_templ = "cd {exec_folder}; " + \
"{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -151,12 +153,10 @@
if must_be_empty:
logger.error("Unexpected fio output: %r", must_be_empty)
- res = NodeTestResults(self.__class__.__name__,
- node.info.node_id(),
- get_test_summary(cast(FioJobSection, iter_config)))
+ res = NodeTestResults(self.__class__.__name__, node.info.node_id(), get_test_summary(f_iter_config))
res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
- substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+ self.store_data(res.extra_logs['fio'], "raw", stor_prefix, "fio_raw")
node.conn.fs.unlink(self.remote_output_file)
files = [name for name in node.conn.fs.listdir(self.exec_folder)]
@@ -164,7 +164,7 @@
expected_time_delta = 1000 # 1000ms == 1s
max_time_diff = 50 # 50ms - 5%
- for name, path in get_log_files(cast(FioJobSection, iter_config)):
+ for name, path in get_log_files(f_iter_config):
log_files = [fname for fname in files if fname.startswith(path)]
if len(log_files) != 1:
logger.error("Found %s files, match log pattern %s(%s) - %s",
@@ -173,7 +173,7 @@
fname = os.path.join(self.exec_folder, log_files[0])
raw_result = node.get_file_content(fname) # type: bytes
- substorage.store_raw(raw_result, "{}_raw".format(name))
+ self.store_data(raw_result, "raw", stor_prefix, "{}_raw".format(name))
node.conn.fs.unlink(fname)
try:
@@ -222,7 +222,10 @@
step=expected_time_delta,
data=parsed)
- substorage.set_array(parsed, "{}_data".format(name)) # type: ignore
- substorage["{}_meta".format(name)] = res.series[name].meta() # type: ignore
+ self.store_data(parsed, "array", stor_prefix, "{}_data".format(name))
+ self.store_data(res.series[name].meta(), "yaml", stor_prefix, "{}_meta".format(name))
return res
+
+ def format_for_console(self, data: Any) -> str:
+ raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 8390e3a..6790c97 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -11,6 +11,7 @@
from collections import OrderedDict
+from ...result_classes import IStorable
from ..itest import IterationConfig
from ...utils import sec_to_str, ssize2b
@@ -37,7 +38,9 @@
("vm_count", int)])
-class FioJobSection(IterationConfig):
+class FioJobSection(IterationConfig, IStorable):
+ yaml_tag = 'fio_job'
+
def __init__(self, name: str) -> None:
self.name = name
self.vals = OrderedDict() # type: Dict[str, Any]
@@ -67,6 +70,20 @@
return res
+ def raw(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'vals': list(map(list, self.vals.items())),
+ 'summary': self.summary
+ }
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'FioJobSection':
+ obj = cls(data['name'])
+ obj.summary = data['summary']
+ obj.vals.update(data['vals'])
+ return obj
+
class ParseError(ValueError):
def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 1075aea..d32d6a8 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,8 +1,14 @@
[global]
include defaults_qd.cfg
-ramp_time=5
-runtime=5
+ramp_time=0
+runtime=4
[test_{TEST_SUMM}]
blocksize=60k
rw=randread
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randread
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index aae475c..78986f6 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,14 +3,15 @@
import logging
import os.path
import datetime
-from typing import Dict, Any, List, Optional, Tuple, cast
+from typing import Dict, Any, List, Optional, Callable, cast
from concurrent.futures import ThreadPoolExecutor, wait
from ..utils import Barrier, StopTestError, sec_to_str
from ..node_interfaces import IRPCNode
from ..storage import Storage
-from ..result_classes import NodeTestResults, TimeSerie
+from ..result_classes import NodeTestResults, IStorable
+from queue import Queue
logger = logging.getLogger("wally")
@@ -45,22 +46,23 @@
self.remote_dir = remote_dir
-class IterationConfig:
+class IterationConfig(IStorable):
name = None # type: str
summary = None # type: str
-class PerfTest:
+class PerfTest(metaclass=abc.ABCMeta):
"""Base class for all tests"""
name = None # type: str
max_retry = 3
retry_time = 30
- def __init__(self, config: TestInputConfig) -> None:
+ def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
self.config = config
self.stop_requested = False
self.nodes = self.config.nodes # type: List[IRPCNode]
self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
+ self.on_idle = on_idle
def request_stop(self) -> None:
self.stop_requested = True
@@ -85,29 +87,22 @@
max_time_diff = 5
max_rel_time_diff = 0.05
- def __init__(self, config: TestInputConfig) -> None:
- PerfTest.__init__(self, config)
+ def __init__(self, *args, **kwargs) -> None:
+ PerfTest.__init__(self, *args, **kwargs)
self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
+ self.storage_q = Queue() # type: Any
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
pass
def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
- done_stages = [int(name_id.split("_")[1])
- for is_leaf, name_id in storage.list('result')
- if not is_leaf]
- if len(done_stages) == 0:
- start_run_id = 0
- else:
- start_run_id = max(done_stages) + 1
+ not_done = {} # type: Dict[int, IterationConfig]
- not_in_storage = {} # type: Dict[int, IterationConfig]
-
- for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
+ for run_id, iteration_config in enumerate(self.iterations_configs):
info_path = "result/{}/info".format(run_id)
if info_path in storage:
- info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
+ info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
assert isinstance(info, dict), \
"Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
@@ -120,7 +115,7 @@
expected_config = {
'name': self.name,
'iteration_name': iter_name,
- 'iteration_config': iteration_config,
+ 'iteration_config': iteration_config.raw(),
'params': self.config.params,
'nodes': self.sorted_nodes_ids
}
@@ -132,8 +127,9 @@
logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
else:
- not_in_storage[run_id] = iteration_config
- return not_in_storage
+ not_done[run_id] = iteration_config
+
+ return not_done
def run(self) -> None:
not_in_storage = self.get_not_done_stages(self.config.storage)
@@ -162,6 +158,7 @@
iter_name = "Unnamed" if iteration_config is None else iteration_config.name
logger.info("Run test iteration %s", iter_name)
+ current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
results = [] # type: List[NodeTestResults]
for idx in range(self.max_retry):
logger.debug("Prepare iteration %s", iter_name)
@@ -175,12 +172,8 @@
try:
futures = []
for node in self.nodes:
- path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
- run_id,
- node.info.node_id())
- self.config.storage.clear(path)
- mstorage = self.config.storage.sub_storage(path)
- futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+ path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
+ futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
results = [fut.result() for fut in futures]
break
@@ -196,11 +189,7 @@
stop_times = [] # type: List[int]
# TODO: FIX result processing - NodeTestResults
- result = None # type: NodeTestResults
for result in results:
- mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
- .format(result.summary, run_id, result.node_id))
- serie = None # type: TimeSerie
for name, serie in result.series.items():
start_times.append(serie.start_at)
stop_times.append(serie.step * len(serie.data))
@@ -224,14 +213,43 @@
test_config = {
'name': self.name,
'iteration_name': iter_name,
- 'iteration_config': iteration_config,
+ 'iteration_config': iteration_config.raw(),
'params': self.config.params,
'nodes': self.sorted_nodes_ids,
'begin_time': min_start_time,
'end_time': max_stop_time
}
- self.config.storage["result", str(run_id), "info"] = test_config # type: ignore
+ self.process_storage_queue()
+ self.config.storage.put(test_config, "result", str(run_id), "info")
+
+ if "all_results" in self.config.storage:
+ all_results = self.config.storage.get("all_results")
+ else:
+ all_results = []
+
+ all_results.append([self.name, iteration_config.summary, current_result_path])
+ self.config.storage.put(all_results, "all_results")
+ self.config.storage.sync()
+
+ if self.on_idle is not None:
+ self.on_idle()
+
+ def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
+ self.storage_q.put((val, type, prefix, path))
+
+ def process_storage_queue(self) -> None:
+ while not self.storage_q.empty():
+ value, val_type, subpath, val_path = self.storage_q.get()
+ if val_type == 'raw':
+ self.config.storage.put_raw(value, subpath, *val_path)
+ elif val_type == 'yaml':
+ self.config.storage.put(value, subpath, *val_path)
+ elif val_type == 'array':
+ self.config.storage.put_array(value, subpath, *val_path)
+ else:
+ logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
+ raise StopTestError()
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
@@ -242,7 +260,7 @@
pass
@abc.abstractmethod
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
pass
@@ -269,7 +287,7 @@
def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
pass
- def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+ def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
# TODO: have to store logs
cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')