2.0 is on the way
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index e0d7dab..35ddc62 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -13,6 +13,7 @@
from ..node_interfaces import NodeInfo
from ..node import connect, setup_rpc
from ..ssh_utils import parse_ssh_uri
+from ..test_run_class import TestRun
logger = logging.getLogger("wally.discover")
@@ -39,7 +40,10 @@
DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
-def discover(discover_list: List[str], clusters_info: ConfigBlock, discover_nodes: bool = True) -> DiscoveryResult:
+def discover(ctx: TestRun,
+ discover_list: List[str],
+ clusters_info: ConfigBlock,
+ discover_nodes: bool = True) -> DiscoveryResult:
"""Discover nodes in clusters"""
new_nodes = [] # type: List[NodeInfo]
@@ -52,21 +56,7 @@
continue
cluster_info = clusters_info["openstack"] # type: ConfigBlock
-
- conn = cluster_info['connection'] # type: ConfigBlock
- if not conn:
- logger.error("No connection provided for %s. Skipping", cluster)
- continue
-
- user, passwd, tenant = parse_creds(conn['creds'])
-
- auth_data = dict(auth_url=conn['auth_url'],
- username=user,
- api_key=passwd,
- project_id=tenant) # type: Dict[str, str]
-
- logger.debug("Discovering openstack nodes with connection details: %r", conn)
- new_nodes.extend(openstack.discover_openstack_nodes(auth_data, cluster_info))
+ new_nodes.extend(openstack.discover_openstack_nodes(ctx.os_connection, cluster_info))
elif cluster == "fuel" or cluster == "fuel_openrc_only":
if cluster == "fuel_openrc_only":
@@ -74,15 +64,17 @@
fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
try:
- fuel_rpc_conn = setup_rpc(connect(fuel_node_info))
+ fuel_rpc_conn = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
except AuthenticationException:
raise StopTestError("Wrong fuel credentials")
except Exception:
logger.exception("While connection to FUEL")
raise StopTestError("Failed to connect to FUEL")
+ # TODO(koder): keep FUEL rpc in context? Maybe open this connection on upper stack level?
with fuel_rpc_conn:
- nodes, fuel_info = fuel.discover_fuel_nodes(fuel_rpc_conn, clusters_info['fuel'], discover_nodes)
+ nodes, fuel_info = fuel.discover_fuel_nodes(
+ fuel_rpc_conn, ctx.fuel_conn, clusters_info['fuel'], discover_nodes)
new_nodes.extend(nodes)
if fuel_info.openrc:
@@ -105,7 +97,7 @@
conf = clusters_info["ceph"].get("conf")
key = clusters_info["ceph"].get("key")
info = NodeInfo(parse_ssh_uri(root_node_uri), set())
- with setup_rpc(connect(info)) as ceph_root_conn:
+ with setup_rpc(connect(info), ctx.rpc_code) as ceph_root_conn:
new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
else:
logger.warning("Skip ceph cluster discovery")
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 119cbd0..7b4f90f 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,12 +1,12 @@
import logging
import socket
-from typing import Dict, Any, Tuple, List, NamedTuple, Union
+from typing import Dict, Any, Tuple, List, NamedTuple, Union, cast
from urllib.parse import urlparse
from .. import fuel_rest_api
from ..node_interfaces import NodeInfo, IRPCNode
from ..ssh_utils import ConnCreds
-from ..utils import parse_creds, check_input_param
+from ..utils import check_input_param
logger = logging.getLogger("wally.discover")
@@ -18,50 +18,41 @@
def discover_fuel_nodes(fuel_master_node: IRPCNode,
+ fuel_conn: fuel_rest_api.Connection,
fuel_data: Dict[str, Any],
discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
"""Discover nodes in fuel cluster, get openrc for selected cluster"""
- # parse FUEL REST credentials
- username, tenant_name, password = parse_creds(fuel_data['creds'])
- creds = {"username": username,
- "tenant_name": tenant_name,
- "password": password}
-
- # connect to FUEL
- conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
msg = "openstack_env should be provided in fuel config"
check_input_param('openstack_env' in fuel_data, msg)
# get cluster information from REST API
- cluster_id = fuel_rest_api.get_cluster_id(conn, fuel_data['openstack_env'])
- cluster = fuel_rest_api.reflect_cluster(conn, cluster_id)
- version = fuel_rest_api.FuelInfo(conn).get_version()
+ cluster_id = fuel_rest_api.get_cluster_id(fuel_conn, fuel_data['openstack_env'])
+ cluster = fuel_rest_api.reflect_cluster(fuel_conn, cluster_id)
+ version = fuel_rest_api.FuelInfo(fuel_conn).get_version()
if not discover_nodes:
logger.warning("Skip fuel cluster discovery")
- return [], FuelNodeInfo(version, None, cluster.get_openrc())
+ return [], FuelNodeInfo(version, None, cluster.get_openrc()) # type: ignore
- fuel_nodes = list(cluster.get_nodes())
-
- logger.info("Found FUEL {0}".format(".".join(map(str, version))))
-
- network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
-
- fuel_host = urlparse(fuel_data['url']).hostname
- fuel_ip = socket.gethostbyname(fuel_host)
- fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
+ logger.info("Found fuel {0}".format(".".join(map(str, version))))
# get FUEL master key to connect to cluster nodes via ssh
logger.debug("Downloading fuel master key")
fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
+ network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
+ fuel_ip = socket.gethostbyname(fuel_conn.host)
+ fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
+
nodes = []
- for fuel_node in fuel_nodes:
+ for fuel_node in list(cluster.get_nodes()):
ip = str(fuel_node.get_ip(network))
- nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key), roles=set(fuel_node.get_roles())))
+ creds = ConnCreds(ip, "root", key=fuel_key)
+ nodes.append(NodeInfo(creds, roles=set(fuel_node.get_roles())))
logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
- return nodes, FuelNodeInfo(version, fuel_ext_iface, cluster.get_openrc())
+ return nodes, FuelNodeInfo(version, fuel_ext_iface,
+ cast(Dict[str, Union[str, bool]], cluster.get_openrc()))
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
index 88e8656..f590359 100644
--- a/wally/discover/openstack.py
+++ b/wally/discover/openstack.py
@@ -1,13 +1,11 @@
import socket
import logging
-from typing import Dict, Any, List
-
-
-from novaclient.client import Client
+from typing import Dict, Any, List, Optional, cast
from ..node_interfaces import NodeInfo
from ..config import ConfigBlock
-from ..utils import parse_creds
+from ..ssh_utils import ConnCreds
+from ..start_vms import OSConnection, NovaClient
logger = logging.getLogger("wally.discover")
@@ -24,73 +22,45 @@
raise ValueError("VM {} has no floating ip".format(vm))
-def get_ssh_url(user: str, password: str, ip: str, key: str) -> str:
- """Get ssh connection URL from parts"""
-
- if password is not None:
- assert key is None, "Both key and password provided"
- return "ssh://{}:{}@{}".format(user, password, ip)
- else:
- assert key is not None, "None of key/password provided"
- return "ssh://{}@{}::{}".format(user, ip, key)
-
-
-def discover_vms(client: Client, search_opts: Dict) -> List[NodeInfo]:
+def discover_vms(client: NovaClient, search_data: str) -> List[NodeInfo]:
"""Discover virtual machines"""
- user, password, key = parse_creds(search_opts.pop('auth'))
-
- servers = client.servers.list(search_opts=search_opts)
+ name, user, key_file = search_data.split(",")
+ servers = client.servers.list(search_opts={"name": name})
logger.debug("Found %s openstack vms" % len(servers))
nodes = [] # type: List[NodeInfo]
for server in servers:
ip = get_floating_ip(server)
- nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), roles={"test_vm"}))
+ creds = ConnCreds(host=ip, user=user, key_file=key_file)
+ nodes.append(NodeInfo(creds, roles={"test_vm"}))
+
return nodes
-def discover_services(client: Client, opts: Dict[str, Any]) -> List[NodeInfo]:
+def discover_openstack_nodes(conn: OSConnection, conf: ConfigBlock) -> List[NodeInfo]:
"""Discover openstack services for given cluster"""
- user, password, key = parse_creds(opts.pop('auth'))
+ os_nodes_auth = conf['auth'] # type: str
- services = []
- if opts['service'] == "all":
- services = client.services.list()
+ if os_nodes_auth.count(":") == 2:
+ user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
+ if not password:
+ password = None
else:
- if isinstance(opts['service'], str):
- opts['service'] = [opts['service']]
+ user, password = os_nodes_auth.split(":")
+ key_file = None
- for s in opts['service']:
- services.extend(client.services.list(binary=s))
-
- host_services_mapping = {} # type: Dict[str, [str]]
+ services = conn.nova.services.list() # type: List[Any]
+ host_services_mapping = {} # type: Dict[str, List[str]]
for service in services:
- ip = socket.gethostbyname(service.host)
+ ip = cast(str, socket.gethostbyname(service.host))
host_services_mapping.get(ip, []).append(service.binary)
- logger.debug("Found %s openstack service nodes" %
- len(host_services_mapping))
+ logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
nodes = [] # type: List[NodeInfo]
for host, services in host_services_mapping.items():
- ssh_url = get_ssh_url(user, password, host, key)
- nodes.append(NodeInfo(ssh_url, services))
+ creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+ nodes.append(NodeInfo(creds, set(services)))
return nodes
-
-
-def discover_openstack_nodes(conn_details: Dict[str, str], conf: ConfigBlock) -> List[NodeInfo]:
- """Discover vms running in openstack
- conn_details - dict with openstack connection details -
- auth_url, api_key (password), username
- conf - test configuration object
- """
- client = Client(version='1.1', **conn_details)
-
- if conf.get('discover'):
- services_to_discover = conf['discover'].get('nodes')
- if services_to_discover:
- return discover_services(client, services_to_discover)
-
- return []
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 2addaf4..6117606 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -2,9 +2,10 @@
import abc
import json
import logging
-import urllib.request
import urllib.parse
-from typing import Dict, Any, Iterator, Match, List, Callable
+import urllib.error
+import urllib.request
+from typing import Dict, Any, Iterator, List, Callable, cast
from functools import partial
import netaddr
@@ -16,13 +17,14 @@
class Connection(metaclass=abc.ABCMeta):
+ host = None # type: str
+
@abc.abstractmethod
def do(self, method: str, path: str, params: Dict = None) -> Dict:
pass
- @abc.abstractmethod
def get(self, path: str, params: Dict = None) -> Dict:
- pass
+ return self.do("GET", path, params)
class Urllib2HTTP(Connection):
@@ -47,7 +49,7 @@
else:
self.headers = headers
- def do(self, method: str, path: str, params: Dict = None) -> Dict:
+ def do(self, method: str, path: str, params: Dict = None) -> Any:
if path.startswith('/'):
url = self.root_url + path
else:
@@ -62,17 +64,17 @@
logger.debug("HTTP: {0} {1}".format(method.upper(), url))
request = urllib.request.Request(url,
- data=data_json,
+ data=data_json.encode("utf8"),
headers=self.headers)
if data_json is not None:
request.add_header('Content-Type', 'application/json')
- request.get_method = lambda: method.upper()
+ request.get_method = lambda: method.upper() # type: ignore
response = urllib.request.urlopen(request)
+ code = response.code # type: ignore
- logger.debug("HTTP Responce: {0}".format(response.code))
-
- if response.code < 200 or response.code > 209:
+ logger.debug("HTTP Responce: {0}".format(code))
+ if code < 200 or code > 209:
raise IndexError(url)
content = response.read()
@@ -80,9 +82,9 @@
if '' == content:
return None
- return json.loads(content)
+ return json.loads(content.decode("utf8"))
- def __getattr__(self, name: str):
+ def __getattr__(self, name: str) -> Any:
if name in self.allowed_methods:
return partial(self.do, name)
raise AttributeError(name)
@@ -107,11 +109,11 @@
'Cant establish connection to keystone with url %s',
self.keystone_url)
- def do(self, method: str, path: str, params: Dict[str, str] = None) -> Dict[str, Any]:
+ def do(self, method: str, path: str, params: Dict[str, str] = None) -> Any:
"""Do request. If gets 401 refresh token"""
try:
return super(KeystoneAuth, self).do(method, path, params)
- except urllib.request.HTTPError as e:
+ except urllib.error.HTTPError as e:
if e.code == 401:
logger.warning(
'Authorization failure: {0}'.format(e.read()))
@@ -121,17 +123,17 @@
raise
-def get_inline_param_list(url: str) -> Iterator[Match]:
+def get_inline_param_list(url: str) -> Iterator[str]:
format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
for match in format_param_rr.finditer(url):
yield match.group(1)
class RestObj:
- name = None
- id = None
+ name = None # type: str
+ id = None # type: int
- def __init__(self, conn, **kwargs) -> None:
+ def __init__(self, conn: Connection, **kwargs: Any) -> None:
self.__dict__.update(kwargs)
self.__connection__ = conn
@@ -148,8 +150,8 @@
return getattr(self, item)
-def make_call(method: str, url: str) -> Callable[[Any, Any], Dict]:
- def closure(obj: Any, entire_obj: Any = None, **data) -> Dict:
+def make_call(method: str, url: str) -> Callable:
+ def closure(obj: Any, entire_obj: Any = None, **data: Any) -> Any:
inline_params_vals = {}
for name in get_inline_param_list(url):
if name in data:
@@ -167,9 +169,12 @@
return closure
-PUT = partial(make_call, 'put')
-GET = partial(make_call, 'get')
-DELETE = partial(make_call, 'delete')
+RequestMethod = Callable[[str], Callable]
+
+
+PUT = cast(RequestMethod, partial(make_call, 'put')) # type: RequestMethod
+GET = cast(RequestMethod, partial(make_call, 'get')) # type: RequestMethod
+DELETE = cast(RequestMethod, partial(make_call, 'delete')) # type: RequestMethod
# ------------------------------- ORM ----------------------------------------
@@ -270,14 +275,13 @@
super(Cluster, self).__init__(*dt, **mp)
self.nodes = NodeList([Node(self.__connection__, **node) for node in
self._get_nodes()])
- self.network_roles = {}
def check_exists(self) -> bool:
"""Check if cluster exists"""
try:
self.get_status()
return True
- except urllib.request.HTTPError as err:
+ except urllib.error.HTTPError as err:
if err.code == 404:
return False
raise
diff --git a/wally/main.py b/wally/main.py
index 360506e..3e1fcb3 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -162,6 +162,7 @@
storage['config'] = config # type: ignore
stages.extend([
+ run_test.clouds_connect_stage,
run_test.discover_stage,
run_test.reuse_vms_stage,
log_nodes_statistic_stage,
diff --git a/wally/meta_info.py b/wally/meta_info.py
index e0e2b30..7dc3901 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -1,4 +1,4 @@
-from typing import Any, Dict, Union, List
+from typing import Any, Dict, List
from .fuel_rest_api import KeystoneAuth, FuelInfo
@@ -21,13 +21,13 @@
return lab_data
-def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Union[List[Dict[str, str]], str]]:
+def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Any]:
finfo = FuelInfo(KeystoneAuth(url, cred))
nodes = [] # type: List[Dict[str, str]]
- result = {}
+ result = {} # type: Dict[str, Any]
- for node in finfo.get_nodes():
+ for node in finfo.get_nodes(): # type: ignore
node_info = {
'name': node['name'],
'processors': [],
@@ -51,4 +51,5 @@
result['nodes'] = nodes
result['fuel_version'] = finfo.get_version()
result['total_info'] = total_lab_info(nodes)
+
return result
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index e75c2a3..ac83267 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -54,6 +54,7 @@
class IRPCNode(metaclass=abc.ABCMeta):
"""Remote filesystem interface"""
info = None # type: NodeInfo
+ conn = None # type: Any
@abc.abstractmethod
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
diff --git a/wally/report.py b/wally/report.py
index ed3c362..88c97b7 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -5,7 +5,7 @@
import itertools
import collections
from io import StringIO
-from typing import Dict
+from typing import Dict, Any, Iterator, Tuple, cast
try:
import numpy
@@ -19,6 +19,8 @@
import wally
from .utils import ssize2b
from .statistic import round_3_digit
+from .storage import Storage
+from .result_classes import TestInfo, FullTestResult, SensorInfo
from .suits.io.fio_task_parser import (get_test_sync_mode,
get_test_summary,
parse_all_in_1,
@@ -28,811 +30,1546 @@
logger = logging.getLogger("wally.report")
-class DiskInfo:
- def __init__(self):
- self.direct_iops_r_max = 0
- self.direct_iops_w_max = 0
+def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
+ sensors_data = {} # type: Dict[Tuple[str, str, str], SensorInfo]
- # 64 used instead of 4k to faster feed caches
- self.direct_iops_w64_max = 0
+ for _, node_id in storage.list("metric"):
+ for _, dev_name in storage.list("metric", node_id):
+ for _, sensor_name in storage.list("metric", node_id, dev_name):
+ key = (node_id, dev_name, sensor_name)
+ si = SensorInfo(*key)
+ si.begin_time, si.end_time, si.data = storage["metric/{}/{}/{}".format(*key)] # type: ignore
+ sensors_data[key] = si
- self.rws4k_10ms = 0
- self.rws4k_30ms = 0
- self.rws4k_100ms = 0
- self.bw_write_max = 0
- self.bw_read_max = 0
+ for _, run_id in storage.list("result"):
+ path = "result/" + run_id
+ ftr = FullTestResult()
+ ftr.info = storage.load(TestInfo, path, "info")
+ ftr.performance_data = {}
+ p1 = "result/{}/measurement".format(run_id)
+ for _, node_id in storage.list(p1):
+ for _, measurement_name in storage.list(p1, node_id):
+ perf_key = (node_id, measurement_name)
+ ftr.performance_data[perf_key] = storage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
-report_funcs = []
+ yield ftr
-class Attrmapper(object):
- def __init__(self, dct: Dict):
- self.__dct = dct
-
- def __getattr__(self, name):
- try:
- return self.__dct[name]
- except KeyError:
- raise AttributeError(name)
-
-
-class PerfInfo(object):
- def __init__(self, name, summary, intervals, params, testnodes_count):
- self.name = name
- self.bw = None
- self.iops = None
- self.lat = None
- self.lat_50 = None
- self.lat_95 = None
-
- self.raw_bw = []
- self.raw_iops = []
- self.raw_lat = []
-
- self.params = params
- self.intervals = intervals
- self.testnodes_count = testnodes_count
- self.summary = summary
- self.p = Attrmapper(self.params.vals)
-
- self.sync_mode = get_test_sync_mode(self.params)
- self.concurence = self.params.vals.get('numjobs', 1)
-
-
-# disk_info = None
-# base = None
-# linearity = None
-
-
-def group_by_name(test_data):
- name_map = collections.defaultdict(lambda: [])
-
- for data in test_data:
- name_map[(data.name, data.summary())].append(data)
-
- return name_map
-
-
-def report(name, required_fields):
- def closure(func):
- report_funcs.append((required_fields.split(","), name, func))
- return func
- return closure
-
-
-def get_test_lcheck_params(pinfo):
- res = [{
- 's': 'sync',
- 'd': 'direct',
- 'a': 'async',
- 'x': 'sync direct'
- }[pinfo.sync_mode]]
-
- res.append(pinfo.p.rw)
-
- return " ".join(res)
-
-
-def get_emb_data_svg(plt):
- sio = StringIO()
- plt.savefig(sio, format='svg')
- img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
- return sio.getvalue().split(img_start, 1)[1]
-
-
-def get_template(templ_name):
- very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
- templ_dir = os.path.join(very_root_dir, 'report_templates')
- templ_file = os.path.join(templ_dir, templ_name)
- return open(templ_file, 'r').read()
-
-
-def group_by(data, func):
- if len(data) < 2:
- yield data
- return
-
- ndata = [(func(dt), dt) for dt in data]
- ndata.sort(key=func)
- pkey, dt = ndata[0]
- curr_list = [dt]
-
- for key, val in ndata[1:]:
- if pkey != key:
- yield curr_list
- curr_list = [val]
- else:
- curr_list.append(val)
- pkey = key
-
- yield curr_list
-
-
-@report('linearity', 'linearity_test')
-def linearity_report(processed_results, lab_info, comment):
- labels_and_data_mp = collections.defaultdict(lambda: [])
- vls = {}
-
- # plot io_time = func(bsize)
- for res in processed_results.values():
- if res.name.startswith('linearity_test'):
- iotimes = [1000. / val for val in res.iops.raw]
-
- op_summ = get_test_summary(res.params)[:3]
-
- labels_and_data_mp[op_summ].append(
- [res.p.blocksize, res.iops.raw, iotimes])
-
- cvls = res.params.vals.copy()
- del cvls['blocksize']
- del cvls['rw']
-
- cvls.pop('sync', None)
- cvls.pop('direct', None)
- cvls.pop('buffered', None)
-
- if op_summ not in vls:
- vls[op_summ] = cvls
- else:
- assert cvls == vls[op_summ]
-
- all_labels = None
- _, ax1 = plt.subplots()
- for name, labels_and_data in labels_and_data_mp.items():
- labels_and_data.sort(key=lambda x: ssize2b(x[0]))
-
- labels, _, iotimes = zip(*labels_and_data)
-
- if all_labels is None:
- all_labels = labels
- else:
- assert all_labels == labels
-
- plt.boxplot(iotimes)
- if len(labels_and_data) > 2 and \
- ssize2b(labels_and_data[-2][0]) >= 4096:
-
- xt = range(1, len(labels) + 1)
-
- def io_time(sz, bw, initial_lat):
- return sz / bw + initial_lat
-
- x = numpy.array(map(ssize2b, labels))
- y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
- popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
-
- y1 = io_time(x, *popt)
- plt.plot(xt, y1, linestyle='--',
- label=name + ' LS linear approx')
-
- for idx, (sz, _, _) in enumerate(labels_and_data):
- if ssize2b(sz) >= 4096:
- break
-
- bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
- lat = y[-1] - x[-1] / bw
- y2 = io_time(x, bw, lat)
- plt.plot(xt, y2, linestyle='--',
- label=abbv_name_to_full(name) +
- ' (4k & max) linear approx')
-
- plt.setp(ax1, xticklabels=labels)
-
- plt.xlabel("Block size")
- plt.ylabel("IO time, ms")
-
- plt.subplots_adjust(top=0.85)
- plt.legend(bbox_to_anchor=(0.5, 1.15),
- loc='upper center',
- prop={'size': 10}, ncol=2)
- plt.grid()
- iotime_plot = get_emb_data_svg(plt)
- plt.clf()
-
- # plot IOPS = func(bsize)
- _, ax1 = plt.subplots()
-
- for name, labels_and_data in labels_and_data_mp.items():
- labels_and_data.sort(key=lambda x: ssize2b(x[0]))
- _, data, _ = zip(*labels_and_data)
- plt.boxplot(data)
- avg = [float(sum(arr)) / len(arr) for arr in data]
- xt = range(1, len(data) + 1)
- plt.plot(xt, avg, linestyle='--',
- label=abbv_name_to_full(name) + " avg")
-
- plt.setp(ax1, xticklabels=labels)
- plt.xlabel("Block size")
- plt.ylabel("IOPS")
- plt.legend(bbox_to_anchor=(0.5, 1.15),
- loc='upper center',
- prop={'size': 10}, ncol=2)
- plt.grid()
- plt.subplots_adjust(top=0.85)
-
- iops_plot = get_emb_data_svg(plt)
-
- res = set(get_test_lcheck_params(res) for res in processed_results.values())
- ncount = list(set(res.testnodes_count for res in processed_results.values()))
- conc = list(set(res.concurence for res in processed_results.values()))
-
- assert len(conc) == 1
- assert len(ncount) == 1
-
- descr = {
- 'vm_count': ncount[0],
- 'concurence': conc[0],
- 'oper_descr': ", ".join(res).capitalize()
- }
-
- params_map = {'iotime_vs_size': iotime_plot,
- 'iops_vs_size': iops_plot,
- 'descr': descr}
-
- return get_template('report_linearity.html').format(**params_map)
-
-
-@report('lat_vs_iops', 'lat_vs_iops')
-def lat_vs_iops(processed_results, lab_info, comment):
- lat_iops = collections.defaultdict(lambda: [])
- requsted_vs_real = collections.defaultdict(lambda: {})
-
- for res in processed_results.values():
- if res.name.startswith('lat_vs_iops'):
- lat_iops[res.concurence].append((res.lat,
- 0,
- res.iops.average,
- res.iops.deviation))
- # lat_iops[res.concurence].append((res.lat.average / 1000.0,
- # res.lat.deviation / 1000.0,
- # res.iops.average,
- # res.iops.deviation))
- requested_iops = res.p.rate_iops * res.concurence
- requsted_vs_real[res.concurence][requested_iops] = \
- (res.iops.average, res.iops.deviation)
-
- colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
- colors_it = iter(colors)
- for conc, lat_iops in sorted(lat_iops.items()):
- lat, dev, iops, iops_dev = zip(*lat_iops)
- plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
- label=str(conc) + " threads",
- color=next(colors_it))
-
- plt.xlabel("IOPS")
- plt.ylabel("Latency, ms")
- plt.grid()
- plt.legend(loc=0)
- plt_iops_vs_lat = get_emb_data_svg(plt)
- plt.clf()
-
- colors_it = iter(colors)
- for conc, req_vs_real in sorted(requsted_vs_real.items()):
- req, real = zip(*sorted(req_vs_real.items()))
- iops, dev = zip(*real)
- plt.errorbar(req, iops, yerr=dev, fmt='ro',
- label=str(conc) + " threads",
- color=next(colors_it))
- plt.xlabel("Requested IOPS")
- plt.ylabel("Get IOPS")
- plt.grid()
- plt.legend(loc=0)
- plt_iops_vs_requested = get_emb_data_svg(plt)
-
- res1 = processed_results.values()[0]
- params_map = {'iops_vs_lat': plt_iops_vs_lat,
- 'iops_vs_requested': plt_iops_vs_requested,
- 'oper_descr': get_test_lcheck_params(res1).capitalize()}
-
- return get_template('report_iops_vs_lat.html').format(**params_map)
-
-
-def render_all_html(comment, info, lab_description, images, templ_name):
- data = info.__dict__.copy()
- for name, val in data.items():
- if not name.startswith('__'):
- if val is None:
- if name in ('direct_iops_w64_max', 'direct_iops_w_max'):
- data[name] = ('-', '-', '-')
- else:
- data[name] = '-'
- elif isinstance(val, (int, float, long)):
- data[name] = round_3_digit(val)
-
- data['bw_read_max'] = (data['bw_read_max'][0] // 1024,
- data['bw_read_max'][1],
- data['bw_read_max'][2])
-
- data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
- data['bw_write_max'][1],
- data['bw_write_max'][2])
-
- images.update(data)
- templ = get_template(templ_name)
- return templ.format(lab_info=lab_description,
- comment=comment,
- **images)
-
-
-def io_chart(title, concurence,
- latv, latv_min, latv_max,
- iops_or_bw, iops_or_bw_err,
- legend,
- log_iops=False,
- log_lat=False,
- boxplots=False,
- latv_50=None,
- latv_95=None,
- error2=None):
-
- matplotlib.rcParams.update({'font.size': 10})
- points = " MiBps" if legend == 'BW' else ""
- lc = len(concurence)
- width = 0.35
- xt = range(1, lc + 1)
-
- op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
- fig, p1 = plt.subplots()
- xpos = [i - width / 2 for i in xt]
-
- p1.bar(xpos, iops_or_bw,
- width=width,
- color='y',
- label=legend)
-
- err1_leg = None
- for pos, y, err in zip(xpos, iops_or_bw, iops_or_bw_err):
- err1_leg = p1.errorbar(pos + width / 2,
- y,
- err,
- color='magenta')
-
- err2_leg = None
- if error2 is not None:
- for pos, y, err in zip(xpos, iops_or_bw, error2):
- err2_leg = p1.errorbar(pos + width / 2 + 0.08,
- y,
- err,
- lw=2,
- alpha=0.5,
- color='teal')
-
- p1.grid(True)
- p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
- handles1, labels1 = p1.get_legend_handles_labels()
-
- handles1 += [err1_leg]
- labels1 += ["95% conf"]
-
- if err2_leg is not None:
- handles1 += [err2_leg]
- labels1 += ["95% dev"]
-
- p2 = p1.twinx()
-
- if latv_50 is None:
- p2.plot(xt, latv_max, label="lat max")
- p2.plot(xt, latv, label="lat avg")
- p2.plot(xt, latv_min, label="lat min")
- else:
- p2.plot(xt, latv_50, label="lat med")
- p2.plot(xt, latv_95, label="lat 95%")
-
- plt.xlim(0.5, lc + 0.5)
- plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
- p1.set_xlabel("VM Count * Thread per VM")
- p1.set_ylabel(legend + points)
- p2.set_ylabel("Latency ms")
- plt.title(title)
- handles2, labels2 = p2.get_legend_handles_labels()
-
- plt.legend(handles1 + handles2, labels1 + labels2,
- loc='center left', bbox_to_anchor=(1.1, 0.81))
-
- if log_iops:
- p1.set_yscale('log')
-
- if log_lat:
- p2.set_yscale('log')
-
- plt.subplots_adjust(right=0.68)
-
- return get_emb_data_svg(plt)
-
-
-def make_plots(processed_results, plots):
- """
- processed_results: [PerfInfo]
- plots = [(test_name_prefix:str, fname:str, description:str)]
- """
- files = {}
- for name_pref, fname, desc in plots:
- chart_data = []
-
- for res in processed_results:
- summ = res.name + "_" + res.summary
- if summ.startswith(name_pref):
- chart_data.append(res)
-
- if len(chart_data) == 0:
- raise ValueError("Can't found any date for " + name_pref)
-
- use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
-
- chart_data.sort(key=lambda x: x.params['vals']['numjobs'])
-
- lat = None
- lat_min = None
- lat_max = None
-
- lat_50 = [x.lat_50 for x in chart_data]
- lat_95 = [x.lat_95 for x in chart_data]
-
- lat_diff_max = max(x.lat_95 / x.lat_50 for x in chart_data)
- lat_log_scale = (lat_diff_max > 10)
-
- testnodes_count = x.testnodes_count
- concurence = [(testnodes_count, x.concurence)
- for x in chart_data]
-
- if use_bw:
- data = [x.bw.average / 1000 for x in chart_data]
- data_conf = [x.bw.confidence / 1000 for x in chart_data]
- data_dev = [x.bw.deviation * 2.5 / 1000 for x in chart_data]
- name = "BW"
- else:
- data = [x.iops.average for x in chart_data]
- data_conf = [x.iops.confidence for x in chart_data]
- data_dev = [x.iops.deviation * 2 for x in chart_data]
- name = "IOPS"
-
- fc = io_chart(title=desc,
- concurence=concurence,
-
- latv=lat,
- latv_min=lat_min,
- latv_max=lat_max,
-
- iops_or_bw=data,
- iops_or_bw_err=data_conf,
-
- legend=name,
- log_lat=lat_log_scale,
-
- latv_50=lat_50,
- latv_95=lat_95,
-
- error2=data_dev)
- files[fname] = fc
-
- return files
-
-
-def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
- result = None
- attr = 'iops' if iops else 'bw'
- for measurement in processed_results:
- ok = measurement.sync_mode == sync_mode
- ok = ok and (measurement.p.blocksize == blocksize)
- ok = ok and (measurement.p.rw == rw)
-
- if ok:
- field = getattr(measurement, attr)
-
- if result is None:
- result = field
- elif field.average > result.average:
- result = field
-
- return result
-
-
-def get_disk_info(processed_results):
- di = DiskInfo()
- di.direct_iops_w_max = find_max_where(processed_results,
- 'd', '4k', 'randwrite')
- di.direct_iops_r_max = find_max_where(processed_results,
- 'd', '4k', 'randread')
-
- di.direct_iops_w64_max = find_max_where(processed_results,
- 'd', '64k', 'randwrite')
-
- for sz in ('16m', '64m'):
- di.bw_write_max = find_max_where(processed_results,
- 'd', sz, 'randwrite', False)
- if di.bw_write_max is not None:
- break
-
- if di.bw_write_max is None:
- for sz in ('1m', '2m', '4m', '8m'):
- di.bw_write_max = find_max_where(processed_results,
- 'd', sz, 'write', False)
- if di.bw_write_max is not None:
- break
-
- for sz in ('16m', '64m'):
- di.bw_read_max = find_max_where(processed_results,
- 'd', sz, 'randread', False)
- if di.bw_read_max is not None:
- break
-
- if di.bw_read_max is None:
- di.bw_read_max = find_max_where(processed_results,
- 'd', '1m', 'read', False)
-
- rws4k_iops_lat_th = []
- for res in processed_results:
- if res.sync_mode in 'xs' and res.p.blocksize == '4k':
- if res.p.rw != 'randwrite':
- continue
- rws4k_iops_lat_th.append((res.iops.average,
- res.lat,
- # res.lat.average,
- res.concurence))
-
- rws4k_iops_lat_th.sort(key=lambda x: x[2])
-
- latv = [lat for _, lat, _ in rws4k_iops_lat_th]
-
- for tlat in [10, 30, 100]:
- pos = bisect.bisect_left(latv, tlat)
- if 0 == pos:
- setattr(di, 'rws4k_{}ms'.format(tlat), 0)
- elif pos == len(latv):
- iops3, _, _ = rws4k_iops_lat_th[-1]
- iops3 = int(round_3_digit(iops3))
- setattr(di, 'rws4k_{}ms'.format(tlat), ">=" + str(iops3))
- else:
- lat1 = latv[pos - 1]
- lat2 = latv[pos]
-
- iops1, _, th1 = rws4k_iops_lat_th[pos - 1]
- iops2, _, th2 = rws4k_iops_lat_th[pos]
-
- th_lat_coef = (th2 - th1) / (lat2 - lat1)
- th3 = th_lat_coef * (tlat - lat1) + th1
-
- th_iops_coef = (iops2 - iops1) / (th2 - th1)
- iops3 = th_iops_coef * (th3 - th1) + iops1
- iops3 = int(round_3_digit(iops3))
- setattr(di, 'rws4k_{}ms'.format(tlat), iops3)
-
- hdi = DiskInfo()
-
- def pp(x):
- med, conf = x.rounded_average_conf()
- conf_perc = int(float(conf) / med * 100)
- dev_perc = int(float(x.deviation) / med * 100)
- return (round_3_digit(med), conf_perc, dev_perc)
-
- hdi.direct_iops_r_max = pp(di.direct_iops_r_max)
-
- if di.direct_iops_w_max is not None:
- hdi.direct_iops_w_max = pp(di.direct_iops_w_max)
- else:
- hdi.direct_iops_w_max = None
-
- if di.direct_iops_w64_max is not None:
- hdi.direct_iops_w64_max = pp(di.direct_iops_w64_max)
- else:
- hdi.direct_iops_w64_max = None
-
- hdi.bw_write_max = pp(di.bw_write_max)
- hdi.bw_read_max = pp(di.bw_read_max)
-
- hdi.rws4k_10ms = di.rws4k_10ms if 0 != di.rws4k_10ms else None
- hdi.rws4k_30ms = di.rws4k_30ms if 0 != di.rws4k_30ms else None
- hdi.rws4k_100ms = di.rws4k_100ms if 0 != di.rws4k_100ms else None
- return hdi
-
-
-@report('hdd', 'hdd')
-def make_hdd_report(processed_results, lab_info, comment):
- plots = [
- ('hdd_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
- ('hdd_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
- ]
- perf_infos = [res.disk_perf_info() for res in processed_results]
- images = make_plots(perf_infos, plots)
- di = get_disk_info(perf_infos)
- return render_all_html(comment, di, lab_info, images, "report_hdd.html")
-
-
-@report('cinder_iscsi', 'cinder_iscsi')
-def make_cinder_iscsi_report(processed_results, lab_info, comment):
- plots = [
- ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
- ('cinder_iscsi_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
- ]
- perf_infos = [res.disk_perf_info() for res in processed_results]
- try:
- images = make_plots(perf_infos, plots)
- except ValueError:
- plots = [
- ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
- ('cinder_iscsi_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
- ]
- images = make_plots(perf_infos, plots)
- di = get_disk_info(perf_infos)
-
- return render_all_html(comment, di, lab_info, images, "report_cinder_iscsi.html")
-
-
-@report('ceph', 'ceph')
-def make_ceph_report(processed_results, lab_info, comment):
- plots = [
- ('ceph_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
- ('ceph_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
- ('ceph_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
- ('ceph_rwd16m', 'rand_write_16m',
- 'Random write 16m direct MiBps'),
- ]
-
- perf_infos = [res.disk_perf_info() for res in processed_results]
- images = make_plots(perf_infos, plots)
- di = get_disk_info(perf_infos)
- return render_all_html(comment, di, lab_info, images, "report_ceph.html")
-
-
-@report('mixed', 'mixed')
-def make_mixed_report(processed_results, lab_info, comment):
- #
- # IOPS(X% read) = 100 / ( X / IOPS_W + (100 - X) / IOPS_R )
- #
-
- perf_infos = [res.disk_perf_info() for res in processed_results]
- mixed = collections.defaultdict(lambda: [])
-
- is_ssd = False
- for res in perf_infos:
- if res.name.startswith('mixed'):
- if res.name.startswith('mixed-ssd'):
- is_ssd = True
- mixed[res.concurence].append((res.p.rwmixread,
- res.lat,
- 0,
- # res.lat.average / 1000.0,
- # res.lat.deviation / 1000.0,
- res.iops.average,
- res.iops.deviation))
-
- if len(mixed) == 0:
- raise ValueError("No mixed load found")
-
- fig, p1 = plt.subplots()
- p2 = p1.twinx()
-
- colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
- colors_it = iter(colors)
- for conc, mix_lat_iops in sorted(mixed.items()):
- mix_lat_iops = sorted(mix_lat_iops)
- read_perc, lat, dev, iops, iops_dev = zip(*mix_lat_iops)
- p1.errorbar(read_perc, iops, color=next(colors_it),
- yerr=iops_dev, label=str(conc) + " th")
-
- p2.errorbar(read_perc, lat, color=next(colors_it),
- ls='--', yerr=dev, label=str(conc) + " th lat")
-
- if is_ssd:
- p1.set_yscale('log')
- p2.set_yscale('log')
-
- p1.set_xlim(-5, 105)
-
- read_perc = set(read_perc)
- read_perc.add(0)
- read_perc.add(100)
- read_perc = sorted(read_perc)
-
- plt.xticks(read_perc, map(str, read_perc))
-
- p1.grid(True)
- p1.set_xlabel("% of reads")
- p1.set_ylabel("Mixed IOPS")
- p2.set_ylabel("Latency, ms")
-
- handles1, labels1 = p1.get_legend_handles_labels()
- handles2, labels2 = p2.get_legend_handles_labels()
- plt.subplots_adjust(top=0.85)
- plt.legend(handles1 + handles2, labels1 + labels2,
- bbox_to_anchor=(0.5, 1.15),
- loc='upper center',
- prop={'size': 12}, ncol=3)
- plt.show()
-
-
-def make_load_report(idx, results_dir, fname):
- dpath = os.path.join(results_dir, "io_" + str(idx))
- files = sorted(os.listdir(dpath))
- gf = lambda x: "_".join(x.rsplit(".", 1)[0].split('_')[:3])
-
- for key, group in itertools.groupby(files, gf):
- fname = os.path.join(dpath, key + ".fio")
-
- cfgs = list(parse_all_in_1(open(fname).read(), fname))
-
- fname = os.path.join(dpath, key + "_lat.log")
-
- curr = []
- arrays = []
-
- with open(fname) as fd:
- for offset, lat, _, _ in csv.reader(fd):
- offset = int(offset)
- lat = int(lat)
- if len(curr) > 0 and curr[-1][0] > offset:
- arrays.append(curr)
- curr = []
- curr.append((offset, lat))
- arrays.append(curr)
- conc = int(cfgs[0].vals.get('numjobs', 1))
-
- if conc != 5:
- continue
-
- assert len(arrays) == len(cfgs) * conc
-
- garrays = [[(0, 0)] for _ in range(conc)]
-
- for offset in range(len(cfgs)):
- for acc, new_arr in zip(garrays, arrays[offset * conc:(offset + 1) * conc]):
- last = acc[-1][0]
- for off, lat in new_arr:
- acc.append((off / 1000. + last, lat / 1000.))
-
- for cfg, arr in zip(cfgs, garrays):
- plt.plot(*zip(*arr[1:]))
- plt.show()
- exit(1)
-
-
-def make_io_report(dinfo, comment, path, lab_info=None):
- lab_info = {
- "total_disk": "None",
- "total_memory": "None",
- "nodes_count": "None",
- "processor_count": "None"
- }
-
- try:
- res_fields = sorted(v.name for v in dinfo)
-
- found = False
- for fields, name, func in report_funcs:
- for field in fields:
- pos = bisect.bisect_left(res_fields, field)
-
- if pos == len(res_fields):
- break
-
- if not res_fields[pos].startswith(field):
- break
- else:
- found = True
- hpath = path.format(name)
-
- try:
- report = func(dinfo, lab_info, comment)
- except:
- logger.exception("Diring {0} report generation".format(name))
- continue
-
- if report is not None:
- try:
- with open(hpath, "w") as fd:
- fd.write(report)
- except:
- logger.exception("Diring saving {0} report".format(name))
- continue
- logger.info("Report {0} saved into {1}".format(name, hpath))
- else:
- logger.warning("No report produced by {0!r}".format(name))
-
- if not found:
- logger.warning("No report generator found for this load")
-
- except Exception as exc:
- import traceback
- traceback.print_exc()
- logger.error("Failed to generate html report:" + str(exc))
+# class StoragePerfInfo:
+# def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
+# self.direct_iops_r_max = 0 # type: int
+# self.direct_iops_w_max = 0 # type: int
+#
+# # 64 used instead of 4k to faster feed caches
+# self.direct_iops_w64_max = 0 # type: int
+#
+# self.rws4k_10ms = 0 # type: int
+# self.rws4k_30ms = 0 # type: int
+# self.rws4k_100ms = 0 # type: int
+# self.bw_write_max = 0 # type: int
+# self.bw_read_max = 0 # type: int
+#
+# self.bw = None #
+# self.iops = None
+# self.lat = None
+# self.lat_50 = None
+# self.lat_95 = None
+#
+#
+# # disk_info = None
+# # base = None
+# # linearity = None
+#
+#
+# def group_by_name(test_data):
+# name_map = collections.defaultdict(lambda: [])
+#
+# for data in test_data:
+# name_map[(data.name, data.summary())].append(data)
+#
+# return name_map
+#
+#
+# def report(name, required_fields):
+# def closure(func):
+# report_funcs.append((required_fields.split(","), name, func))
+# return func
+# return closure
+#
+#
+# def get_test_lcheck_params(pinfo):
+# res = [{
+# 's': 'sync',
+# 'd': 'direct',
+# 'a': 'async',
+# 'x': 'sync direct'
+# }[pinfo.sync_mode]]
+#
+# res.append(pinfo.p.rw)
+#
+# return " ".join(res)
+#
+#
+# def get_emb_data_svg(plt):
+# sio = StringIO()
+# plt.savefig(sio, format='svg')
+# img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+# return sio.getvalue().split(img_start, 1)[1]
+#
+#
+# def get_template(templ_name):
+# very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+# templ_dir = os.path.join(very_root_dir, 'report_templates')
+# templ_file = os.path.join(templ_dir, templ_name)
+# return open(templ_file, 'r').read()
+#
+#
+# def group_by(data, func):
+# if len(data) < 2:
+# yield data
+# return
+#
+# ndata = [(func(dt), dt) for dt in data]
+# ndata.sort(key=func)
+# pkey, dt = ndata[0]
+# curr_list = [dt]
+#
+# for key, val in ndata[1:]:
+# if pkey != key:
+# yield curr_list
+# curr_list = [val]
+# else:
+# curr_list.append(val)
+# pkey = key
+#
+# yield curr_list
+#
+#
+# @report('linearity', 'linearity_test')
+# def linearity_report(processed_results, lab_info, comment):
+# labels_and_data_mp = collections.defaultdict(lambda: [])
+# vls = {}
+#
+# # plot io_time = func(bsize)
+# for res in processed_results.values():
+# if res.name.startswith('linearity_test'):
+# iotimes = [1000. / val for val in res.iops.raw]
+#
+# op_summ = get_test_summary(res.params)[:3]
+#
+# labels_and_data_mp[op_summ].append(
+# [res.p.blocksize, res.iops.raw, iotimes])
+#
+# cvls = res.params.vals.copy()
+# del cvls['blocksize']
+# del cvls['rw']
+#
+# cvls.pop('sync', None)
+# cvls.pop('direct', None)
+# cvls.pop('buffered', None)
+#
+# if op_summ not in vls:
+# vls[op_summ] = cvls
+# else:
+# assert cvls == vls[op_summ]
+#
+# all_labels = None
+# _, ax1 = plt.subplots()
+# for name, labels_and_data in labels_and_data_mp.items():
+# labels_and_data.sort(key=lambda x: ssize2b(x[0]))
+#
+# labels, _, iotimes = zip(*labels_and_data)
+#
+# if all_labels is None:
+# all_labels = labels
+# else:
+# assert all_labels == labels
+#
+# plt.boxplot(iotimes)
+# if len(labels_and_data) > 2 and \
+# ssize2b(labels_and_data[-2][0]) >= 4096:
+#
+# xt = range(1, len(labels) + 1)
+#
+# def io_time(sz, bw, initial_lat):
+# return sz / bw + initial_lat
+#
+# x = numpy.array(map(ssize2b, labels))
+# y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
+# popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
+#
+# y1 = io_time(x, *popt)
+# plt.plot(xt, y1, linestyle='--',
+# label=name + ' LS linear approx')
+#
+# for idx, (sz, _, _) in enumerate(labels_and_data):
+# if ssize2b(sz) >= 4096:
+# break
+#
+# bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
+# lat = y[-1] - x[-1] / bw
+# y2 = io_time(x, bw, lat)
+# plt.plot(xt, y2, linestyle='--',
+# label=abbv_name_to_full(name) +
+# ' (4k & max) linear approx')
+#
+# plt.setp(ax1, xticklabels=labels)
+#
+# plt.xlabel("Block size")
+# plt.ylabel("IO time, ms")
+#
+# plt.subplots_adjust(top=0.85)
+# plt.legend(bbox_to_anchor=(0.5, 1.15),
+# loc='upper center',
+# prop={'size': 10}, ncol=2)
+# plt.grid()
+# iotime_plot = get_emb_data_svg(plt)
+# plt.clf()
+#
+# # plot IOPS = func(bsize)
+# _, ax1 = plt.subplots()
+#
+# for name, labels_and_data in labels_and_data_mp.items():
+# labels_and_data.sort(key=lambda x: ssize2b(x[0]))
+# _, data, _ = zip(*labels_and_data)
+# plt.boxplot(data)
+# avg = [float(sum(arr)) / len(arr) for arr in data]
+# xt = range(1, len(data) + 1)
+# plt.plot(xt, avg, linestyle='--',
+# label=abbv_name_to_full(name) + " avg")
+#
+# plt.setp(ax1, xticklabels=labels)
+# plt.xlabel("Block size")
+# plt.ylabel("IOPS")
+# plt.legend(bbox_to_anchor=(0.5, 1.15),
+# loc='upper center',
+# prop={'size': 10}, ncol=2)
+# plt.grid()
+# plt.subplots_adjust(top=0.85)
+#
+# iops_plot = get_emb_data_svg(plt)
+#
+# res = set(get_test_lcheck_params(res) for res in processed_results.values())
+# ncount = list(set(res.testnodes_count for res in processed_results.values()))
+# conc = list(set(res.concurence for res in processed_results.values()))
+#
+# assert len(conc) == 1
+# assert len(ncount) == 1
+#
+# descr = {
+# 'vm_count': ncount[0],
+# 'concurence': conc[0],
+# 'oper_descr': ", ".join(res).capitalize()
+# }
+#
+# params_map = {'iotime_vs_size': iotime_plot,
+# 'iops_vs_size': iops_plot,
+# 'descr': descr}
+#
+# return get_template('report_linearity.html').format(**params_map)
+#
+#
+# @report('lat_vs_iops', 'lat_vs_iops')
+# def lat_vs_iops(processed_results, lab_info, comment):
+# lat_iops = collections.defaultdict(lambda: [])
+# requsted_vs_real = collections.defaultdict(lambda: {})
+#
+# for res in processed_results.values():
+# if res.name.startswith('lat_vs_iops'):
+# lat_iops[res.concurence].append((res.lat,
+# 0,
+# res.iops.average,
+# res.iops.deviation))
+# # lat_iops[res.concurence].append((res.lat.average / 1000.0,
+# # res.lat.deviation / 1000.0,
+# # res.iops.average,
+# # res.iops.deviation))
+# requested_iops = res.p.rate_iops * res.concurence
+# requsted_vs_real[res.concurence][requested_iops] = \
+# (res.iops.average, res.iops.deviation)
+#
+# colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
+# colors_it = iter(colors)
+# for conc, lat_iops in sorted(lat_iops.items()):
+# lat, dev, iops, iops_dev = zip(*lat_iops)
+# plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
+# label=str(conc) + " threads",
+# color=next(colors_it))
+#
+# plt.xlabel("IOPS")
+# plt.ylabel("Latency, ms")
+# plt.grid()
+# plt.legend(loc=0)
+# plt_iops_vs_lat = get_emb_data_svg(plt)
+# plt.clf()
+#
+# colors_it = iter(colors)
+# for conc, req_vs_real in sorted(requsted_vs_real.items()):
+# req, real = zip(*sorted(req_vs_real.items()))
+# iops, dev = zip(*real)
+# plt.errorbar(req, iops, yerr=dev, fmt='ro',
+# label=str(conc) + " threads",
+# color=next(colors_it))
+# plt.xlabel("Requested IOPS")
+# plt.ylabel("Get IOPS")
+# plt.grid()
+# plt.legend(loc=0)
+# plt_iops_vs_requested = get_emb_data_svg(plt)
+#
+# res1 = processed_results.values()[0]
+# params_map = {'iops_vs_lat': plt_iops_vs_lat,
+# 'iops_vs_requested': plt_iops_vs_requested,
+# 'oper_descr': get_test_lcheck_params(res1).capitalize()}
+#
+# return get_template('report_iops_vs_lat.html').format(**params_map)
+#
+#
+# def render_all_html(comment, info, lab_description, images, templ_name):
+# data = info.__dict__.copy()
+# for name, val in data.items():
+# if not name.startswith('__'):
+# if val is None:
+# if name in ('direct_iops_w64_max', 'direct_iops_w_max'):
+# data[name] = ('-', '-', '-')
+# else:
+# data[name] = '-'
+# elif isinstance(val, (int, float, long)):
+# data[name] = round_3_digit(val)
+#
+# data['bw_read_max'] = (data['bw_read_max'][0] // 1024,
+# data['bw_read_max'][1],
+# data['bw_read_max'][2])
+#
+# data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
+# data['bw_write_max'][1],
+# data['bw_write_max'][2])
+#
+# images.update(data)
+# templ = get_template(templ_name)
+# return templ.format(lab_info=lab_description,
+# comment=comment,
+# **images)
+#
+#
+# def io_chart(title, concurence,
+# latv, latv_min, latv_max,
+# iops_or_bw, iops_or_bw_err,
+# legend,
+# log_iops=False,
+# log_lat=False,
+# boxplots=False,
+# latv_50=None,
+# latv_95=None,
+# error2=None):
+#
+# matplotlib.rcParams.update({'font.size': 10})
+# points = " MiBps" if legend == 'BW' else ""
+# lc = len(concurence)
+# width = 0.35
+# xt = range(1, lc + 1)
+#
+# op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
+# fig, p1 = plt.subplots()
+# xpos = [i - width / 2 for i in xt]
+#
+# p1.bar(xpos, iops_or_bw,
+# width=width,
+# color='y',
+# label=legend)
+#
+# err1_leg = None
+# for pos, y, err in zip(xpos, iops_or_bw, iops_or_bw_err):
+# err1_leg = p1.errorbar(pos + width / 2,
+# y,
+# err,
+# color='magenta')
+#
+# err2_leg = None
+# if error2 is not None:
+# for pos, y, err in zip(xpos, iops_or_bw, error2):
+# err2_leg = p1.errorbar(pos + width / 2 + 0.08,
+# y,
+# err,
+# lw=2,
+# alpha=0.5,
+# color='teal')
+#
+# p1.grid(True)
+# p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
+# handles1, labels1 = p1.get_legend_handles_labels()
+#
+# handles1 += [err1_leg]
+# labels1 += ["95% conf"]
+#
+# if err2_leg is not None:
+# handles1 += [err2_leg]
+# labels1 += ["95% dev"]
+#
+# p2 = p1.twinx()
+#
+# if latv_50 is None:
+# p2.plot(xt, latv_max, label="lat max")
+# p2.plot(xt, latv, label="lat avg")
+# p2.plot(xt, latv_min, label="lat min")
+# else:
+# p2.plot(xt, latv_50, label="lat med")
+# p2.plot(xt, latv_95, label="lat 95%")
+#
+# plt.xlim(0.5, lc + 0.5)
+# plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
+# p1.set_xlabel("VM Count * Thread per VM")
+# p1.set_ylabel(legend + points)
+# p2.set_ylabel("Latency ms")
+# plt.title(title)
+# handles2, labels2 = p2.get_legend_handles_labels()
+#
+# plt.legend(handles1 + handles2, labels1 + labels2,
+# loc='center left', bbox_to_anchor=(1.1, 0.81))
+#
+# if log_iops:
+# p1.set_yscale('log')
+#
+# if log_lat:
+# p2.set_yscale('log')
+#
+# plt.subplots_adjust(right=0.68)
+#
+# return get_emb_data_svg(plt)
+#
+#
+# def make_plots(processed_results, plots):
+# """
+# processed_results: [PerfInfo]
+# plots = [(test_name_prefix:str, fname:str, description:str)]
+# """
+# files = {}
+# for name_pref, fname, desc in plots:
+# chart_data = []
+#
+# for res in processed_results:
+# summ = res.name + "_" + res.summary
+# if summ.startswith(name_pref):
+# chart_data.append(res)
+#
+# if len(chart_data) == 0:
+# raise ValueError("Can't found any date for " + name_pref)
+#
+# use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
+#
+# chart_data.sort(key=lambda x: x.params['vals']['numjobs'])
+#
+# lat = None
+# lat_min = None
+# lat_max = None
+#
+# lat_50 = [x.lat_50 for x in chart_data]
+# lat_95 = [x.lat_95 for x in chart_data]
+#
+# lat_diff_max = max(x.lat_95 / x.lat_50 for x in chart_data)
+# lat_log_scale = (lat_diff_max > 10)
+#
+# testnodes_count = x.testnodes_count
+# concurence = [(testnodes_count, x.concurence)
+# for x in chart_data]
+#
+# if use_bw:
+# data = [x.bw.average / 1000 for x in chart_data]
+# data_conf = [x.bw.confidence / 1000 for x in chart_data]
+# data_dev = [x.bw.deviation * 2.5 / 1000 for x in chart_data]
+# name = "BW"
+# else:
+# data = [x.iops.average for x in chart_data]
+# data_conf = [x.iops.confidence for x in chart_data]
+# data_dev = [x.iops.deviation * 2 for x in chart_data]
+# name = "IOPS"
+#
+# fc = io_chart(title=desc,
+# concurence=concurence,
+#
+# latv=lat,
+# latv_min=lat_min,
+# latv_max=lat_max,
+#
+# iops_or_bw=data,
+# iops_or_bw_err=data_conf,
+#
+# legend=name,
+# log_lat=lat_log_scale,
+#
+# latv_50=lat_50,
+# latv_95=lat_95,
+#
+# error2=data_dev)
+# files[fname] = fc
+#
+# return files
+#
+#
+# def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
+# result = None
+# attr = 'iops' if iops else 'bw'
+# for measurement in processed_results:
+# ok = measurement.sync_mode == sync_mode
+# ok = ok and (measurement.p.blocksize == blocksize)
+# ok = ok and (measurement.p.rw == rw)
+#
+# if ok:
+# field = getattr(measurement, attr)
+#
+# if result is None:
+# result = field
+# elif field.average > result.average:
+# result = field
+#
+# return result
+#
+#
+# def get_disk_info(processed_results):
+# di = DiskInfo()
+# di.direct_iops_w_max = find_max_where(processed_results,
+# 'd', '4k', 'randwrite')
+# di.direct_iops_r_max = find_max_where(processed_results,
+# 'd', '4k', 'randread')
+#
+# di.direct_iops_w64_max = find_max_where(processed_results,
+# 'd', '64k', 'randwrite')
+#
+# for sz in ('16m', '64m'):
+# di.bw_write_max = find_max_where(processed_results,
+# 'd', sz, 'randwrite', False)
+# if di.bw_write_max is not None:
+# break
+#
+# if di.bw_write_max is None:
+# for sz in ('1m', '2m', '4m', '8m'):
+# di.bw_write_max = find_max_where(processed_results,
+# 'd', sz, 'write', False)
+# if di.bw_write_max is not None:
+# break
+#
+# for sz in ('16m', '64m'):
+# di.bw_read_max = find_max_where(processed_results,
+# 'd', sz, 'randread', False)
+# if di.bw_read_max is not None:
+# break
+#
+# if di.bw_read_max is None:
+# di.bw_read_max = find_max_where(processed_results,
+# 'd', '1m', 'read', False)
+#
+# rws4k_iops_lat_th = []
+# for res in processed_results:
+# if res.sync_mode in 'xs' and res.p.blocksize == '4k':
+# if res.p.rw != 'randwrite':
+# continue
+# rws4k_iops_lat_th.append((res.iops.average,
+# res.lat,
+# # res.lat.average,
+# res.concurence))
+#
+# rws4k_iops_lat_th.sort(key=lambda x: x[2])
+#
+# latv = [lat for _, lat, _ in rws4k_iops_lat_th]
+#
+# for tlat in [10, 30, 100]:
+# pos = bisect.bisect_left(latv, tlat)
+# if 0 == pos:
+# setattr(di, 'rws4k_{}ms'.format(tlat), 0)
+# elif pos == len(latv):
+# iops3, _, _ = rws4k_iops_lat_th[-1]
+# iops3 = int(round_3_digit(iops3))
+# setattr(di, 'rws4k_{}ms'.format(tlat), ">=" + str(iops3))
+# else:
+# lat1 = latv[pos - 1]
+# lat2 = latv[pos]
+#
+# iops1, _, th1 = rws4k_iops_lat_th[pos - 1]
+# iops2, _, th2 = rws4k_iops_lat_th[pos]
+#
+# th_lat_coef = (th2 - th1) / (lat2 - lat1)
+# th3 = th_lat_coef * (tlat - lat1) + th1
+#
+# th_iops_coef = (iops2 - iops1) / (th2 - th1)
+# iops3 = th_iops_coef * (th3 - th1) + iops1
+# iops3 = int(round_3_digit(iops3))
+# setattr(di, 'rws4k_{}ms'.format(tlat), iops3)
+#
+# hdi = DiskInfo()
+#
+# def pp(x):
+# med, conf = x.rounded_average_conf()
+# conf_perc = int(float(conf) / med * 100)
+# dev_perc = int(float(x.deviation) / med * 100)
+# return (round_3_digit(med), conf_perc, dev_perc)
+#
+# hdi.direct_iops_r_max = pp(di.direct_iops_r_max)
+#
+# if di.direct_iops_w_max is not None:
+# hdi.direct_iops_w_max = pp(di.direct_iops_w_max)
+# else:
+# hdi.direct_iops_w_max = None
+#
+# if di.direct_iops_w64_max is not None:
+# hdi.direct_iops_w64_max = pp(di.direct_iops_w64_max)
+# else:
+# hdi.direct_iops_w64_max = None
+#
+# hdi.bw_write_max = pp(di.bw_write_max)
+# hdi.bw_read_max = pp(di.bw_read_max)
+#
+# hdi.rws4k_10ms = di.rws4k_10ms if 0 != di.rws4k_10ms else None
+# hdi.rws4k_30ms = di.rws4k_30ms if 0 != di.rws4k_30ms else None
+# hdi.rws4k_100ms = di.rws4k_100ms if 0 != di.rws4k_100ms else None
+# return hdi
+#
+#
+# @report('hdd', 'hdd')
+# def make_hdd_report(processed_results, lab_info, comment):
+# plots = [
+# ('hdd_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+# ('hdd_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+# ]
+# perf_infos = [res.disk_perf_info() for res in processed_results]
+# images = make_plots(perf_infos, plots)
+# di = get_disk_info(perf_infos)
+# return render_all_html(comment, di, lab_info, images, "report_hdd.html")
+#
+#
+# @report('cinder_iscsi', 'cinder_iscsi')
+# def make_cinder_iscsi_report(processed_results, lab_info, comment):
+# plots = [
+# ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+# ('cinder_iscsi_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+# ]
+# perf_infos = [res.disk_perf_info() for res in processed_results]
+# try:
+# images = make_plots(perf_infos, plots)
+# except ValueError:
+# plots = [
+# ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+# ('cinder_iscsi_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+# ]
+# images = make_plots(perf_infos, plots)
+# di = get_disk_info(perf_infos)
+#
+# return render_all_html(comment, di, lab_info, images, "report_cinder_iscsi.html")
+#
+#
+# @report('ceph', 'ceph')
+# def make_ceph_report(processed_results, lab_info, comment):
+# plots = [
+# ('ceph_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+# ('ceph_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
+# ('ceph_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+# ('ceph_rwd16m', 'rand_write_16m',
+# 'Random write 16m direct MiBps'),
+# ]
+#
+# perf_infos = [res.disk_perf_info() for res in processed_results]
+# images = make_plots(perf_infos, plots)
+# di = get_disk_info(perf_infos)
+# return render_all_html(comment, di, lab_info, images, "report_ceph.html")
+#
+#
+# @report('mixed', 'mixed')
+# def make_mixed_report(processed_results, lab_info, comment):
+# #
+# # IOPS(X% read) = 100 / ( X / IOPS_W + (100 - X) / IOPS_R )
+# #
+#
+# perf_infos = [res.disk_perf_info() for res in processed_results]
+# mixed = collections.defaultdict(lambda: [])
+#
+# is_ssd = False
+# for res in perf_infos:
+# if res.name.startswith('mixed'):
+# if res.name.startswith('mixed-ssd'):
+# is_ssd = True
+# mixed[res.concurence].append((res.p.rwmixread,
+# res.lat,
+# 0,
+# # res.lat.average / 1000.0,
+# # res.lat.deviation / 1000.0,
+# res.iops.average,
+# res.iops.deviation))
+#
+# if len(mixed) == 0:
+# raise ValueError("No mixed load found")
+#
+# fig, p1 = plt.subplots()
+# p2 = p1.twinx()
+#
+# colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
+# colors_it = iter(colors)
+# for conc, mix_lat_iops in sorted(mixed.items()):
+# mix_lat_iops = sorted(mix_lat_iops)
+# read_perc, lat, dev, iops, iops_dev = zip(*mix_lat_iops)
+# p1.errorbar(read_perc, iops, color=next(colors_it),
+# yerr=iops_dev, label=str(conc) + " th")
+#
+# p2.errorbar(read_perc, lat, color=next(colors_it),
+# ls='--', yerr=dev, label=str(conc) + " th lat")
+#
+# if is_ssd:
+# p1.set_yscale('log')
+# p2.set_yscale('log')
+#
+# p1.set_xlim(-5, 105)
+#
+# read_perc = set(read_perc)
+# read_perc.add(0)
+# read_perc.add(100)
+# read_perc = sorted(read_perc)
+#
+# plt.xticks(read_perc, map(str, read_perc))
+#
+# p1.grid(True)
+# p1.set_xlabel("% of reads")
+# p1.set_ylabel("Mixed IOPS")
+# p2.set_ylabel("Latency, ms")
+#
+# handles1, labels1 = p1.get_legend_handles_labels()
+# handles2, labels2 = p2.get_legend_handles_labels()
+# plt.subplots_adjust(top=0.85)
+# plt.legend(handles1 + handles2, labels1 + labels2,
+# bbox_to_anchor=(0.5, 1.15),
+# loc='upper center',
+# prop={'size': 12}, ncol=3)
+# plt.show()
+#
+#
+# def make_load_report(idx, results_dir, fname):
+# dpath = os.path.join(results_dir, "io_" + str(idx))
+# files = sorted(os.listdir(dpath))
+# gf = lambda x: "_".join(x.rsplit(".", 1)[0].split('_')[:3])
+#
+# for key, group in itertools.groupby(files, gf):
+# fname = os.path.join(dpath, key + ".fio")
+#
+# cfgs = list(parse_all_in_1(open(fname).read(), fname))
+#
+# fname = os.path.join(dpath, key + "_lat.log")
+#
+# curr = []
+# arrays = []
+#
+# with open(fname) as fd:
+# for offset, lat, _, _ in csv.reader(fd):
+# offset = int(offset)
+# lat = int(lat)
+# if len(curr) > 0 and curr[-1][0] > offset:
+# arrays.append(curr)
+# curr = []
+# curr.append((offset, lat))
+# arrays.append(curr)
+# conc = int(cfgs[0].vals.get('numjobs', 1))
+#
+# if conc != 5:
+# continue
+#
+# assert len(arrays) == len(cfgs) * conc
+#
+# garrays = [[(0, 0)] for _ in range(conc)]
+#
+# for offset in range(len(cfgs)):
+# for acc, new_arr in zip(garrays, arrays[offset * conc:(offset + 1) * conc]):
+# last = acc[-1][0]
+# for off, lat in new_arr:
+# acc.append((off / 1000. + last, lat / 1000.))
+#
+# for cfg, arr in zip(cfgs, garrays):
+# plt.plot(*zip(*arr[1:]))
+# plt.show()
+# exit(1)
+#
+#
+# def make_io_report(dinfo, comment, path, lab_info=None):
+# lab_info = {
+# "total_disk": "None",
+# "total_memory": "None",
+# "nodes_count": "None",
+# "processor_count": "None"
+# }
+#
+# try:
+# res_fields = sorted(v.name for v in dinfo)
+#
+# found = False
+# for fields, name, func in report_funcs:
+# for field in fields:
+# pos = bisect.bisect_left(res_fields, field)
+#
+# if pos == len(res_fields):
+# break
+#
+# if not res_fields[pos].startswith(field):
+# break
+# else:
+# found = True
+# hpath = path.format(name)
+#
+# try:
+# report = func(dinfo, lab_info, comment)
+# except:
+# logger.exception("Diring {0} report generation".format(name))
+# continue
+#
+# if report is not None:
+# try:
+# with open(hpath, "w") as fd:
+# fd.write(report)
+# except:
+# logger.exception("Diring saving {0} report".format(name))
+# continue
+# logger.info("Report {0} saved into {1}".format(name, hpath))
+# else:
+# logger.warning("No report produced by {0!r}".format(name))
+#
+# if not found:
+# logger.warning("No report generator found for this load")
+#
+# except Exception as exc:
+# import traceback
+# traceback.print_exc()
+# logger.error("Failed to generate html report:" + str(exc))
+#
+#
+# # @classmethod
+# # def prepare_data(cls, results) -> List[Dict[str, Any]]:
+# # """create a table with io performance report for console"""
+# #
+# # def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
+# # tpl = data.summary_tpl()
+# # return (data.name,
+# # tpl.oper,
+# # tpl.mode,
+# # ssize2b(tpl.bsize),
+# # int(tpl.th_count) * int(tpl.vm_count))
+# # res = []
+# #
+# # for item in sorted(results, key=key_func):
+# # test_dinfo = item.disk_perf_info()
+# # testnodes_count = len(item.config.nodes)
+# #
+# # iops, _ = test_dinfo.iops.rounded_average_conf()
+# #
+# # if test_dinfo.iops_sys is not None:
+# # iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
+# # _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
+# # iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
+# # iops_sys = round_3_digit(iops_sys)
+# # else:
+# # iops_sys = None
+# # iops_sys_per_vm = None
+# # iops_sys_dev = None
+# # iops_sys_conf = None
+# #
+# # bw, bw_conf = test_dinfo.bw.rounded_average_conf()
+# # _, bw_dev = test_dinfo.bw.rounded_average_dev()
+# # conf_perc = int(round(bw_conf * 100 / bw))
+# # dev_perc = int(round(bw_dev * 100 / bw))
+# #
+# # lat_50 = round_3_digit(int(test_dinfo.lat_50))
+# # lat_95 = round_3_digit(int(test_dinfo.lat_95))
+# # lat_avg = round_3_digit(int(test_dinfo.lat_avg))
+# #
+# # iops_per_vm = round_3_digit(iops / testnodes_count)
+# # bw_per_vm = round_3_digit(bw / testnodes_count)
+# #
+# # iops = round_3_digit(iops)
+# # bw = round_3_digit(bw)
+# #
+# # summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
+# #
+# # res.append({"name": key_func(item)[0],
+# # "key": key_func(item)[:4],
+# # "summ": summ,
+# # "iops": int(iops),
+# # "bw": int(bw),
+# # "conf": str(conf_perc),
+# # "dev": str(dev_perc),
+# # "iops_per_vm": int(iops_per_vm),
+# # "bw_per_vm": int(bw_per_vm),
+# # "lat_50": lat_50,
+# # "lat_95": lat_95,
+# # "lat_avg": lat_avg,
+# #
+# # "iops_sys": iops_sys,
+# # "iops_sys_per_vm": iops_sys_per_vm,
+# # "sys_conf": iops_sys_conf,
+# # "sys_dev": iops_sys_dev})
+# #
+# # return res
+# #
+# # Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
+# # fiels_and_header = [
+# # Field("Name", "name", "l", 7),
+# # Field("Description", "summ", "l", 19),
+# # Field("IOPS\ncum", "iops", "r", 3),
+# # # Field("IOPS_sys\ncum", "iops_sys", "r", 3),
+# # Field("KiBps\ncum", "bw", "r", 6),
+# # Field("Cnf %\n95%", "conf", "r", 3),
+# # Field("Dev%", "dev", "r", 3),
+# # Field("iops\n/vm", "iops_per_vm", "r", 3),
+# # Field("KiBps\n/vm", "bw_per_vm", "r", 6),
+# # Field("lat ms\nmedian", "lat_50", "r", 3),
+# # Field("lat ms\n95%", "lat_95", "r", 3),
+# # Field("lat\navg", "lat_avg", "r", 3),
+# # ]
+# #
+# # fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
+# #
+# # @classmethod
+# # def format_for_console(cls, results) -> str:
+# # """create a table with io performance report for console"""
+# #
+# # tab = texttable.Texttable(max_width=120)
+# # tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+# # tab.set_cols_align([f.allign for f in cls.fiels_and_header])
+# # sep = ["-" * f.size for f in cls.fiels_and_header]
+# # tab.header([f.header for f in cls.fiels_and_header])
+# # prev_k = None
+# # for item in cls.prepare_data(results):
+# # if prev_k is not None:
+# # if prev_k != item["key"]:
+# # tab.add_row(sep)
+# #
+# # prev_k = item["key"]
+# # tab.add_row([item[f.attr] for f in cls.fiels_and_header])
+# #
+# # return tab.draw()
+# #
+# # @classmethod
+# # def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
+# # """create a table with io performance report for console"""
+# #
+# # tab = texttable.Texttable(max_width=200)
+# # tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+# #
+# # header = [
+# # cls.fiels_and_header_dct["name"].header,
+# # cls.fiels_and_header_dct["summ"].header,
+# # ]
+# # allign = ["l", "l"]
+# #
+# # header.append("IOPS ~ Cnf% ~ Dev%")
+# # allign.extend(["r"] * len(list_of_results))
+# # header.extend(
+# # "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
+# # )
+# #
+# # header.append("BW")
+# # allign.extend(["r"] * len(list_of_results))
+# # header.extend(
+# # "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
+# # )
+# #
+# # header.append("LAT")
+# # allign.extend(["r"] * len(list_of_results))
+# # header.extend(
+# # "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
+# # )
+# #
+# # tab.header(header)
+# # sep = ["-" * 3] * len(header)
+# # processed_results = map(cls.prepare_data, list_of_results)
+# #
+# # key2results = []
+# # for res in processed_results:
+# # key2results.append(dict(
+# # ((item["name"], item["summ"]), item) for item in res
+# # ))
+# #
+# # prev_k = None
+# # iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
+# # for item in processed_results[0]:
+# # if prev_k is not None:
+# # if prev_k != item["key"]:
+# # tab.add_row(sep)
+# #
+# # prev_k = item["key"]
+# #
+# # key = (item['name'], item['summ'])
+# # line = list(key)
+# # base = key2results[0][key]
+# #
+# # line.append(iops_frmt.format(base))
+# #
+# # for test_results in key2results[1:]:
+# # val = test_results.get(key)
+# # if val is None:
+# # line.append("-")
+# # elif base['iops'] == 0:
+# # line.append("Nan")
+# # else:
+# # prc_val = {'dev': val['dev'], 'conf': val['conf']}
+# # prc_val['iops'] = int(100 * val['iops'] / base['iops'])
+# # line.append(iops_frmt.format(prc_val))
+# #
+# # line.append(base['bw'])
+# #
+# # for test_results in key2results[1:]:
+# # val = test_results.get(key)
+# # if val is None:
+# # line.append("-")
+# # elif base['bw'] == 0:
+# # line.append("Nan")
+# # else:
+# # line.append(int(100 * val['bw'] / base['bw']))
+# #
+# # for test_results in key2results:
+# # val = test_results.get(key)
+# # if val is None:
+# # line.append("-")
+# # else:
+# # line.append("{0[lat_50]} - {0[lat_95]}".format(val))
+# #
+# # tab.add_row(line)
+# #
+# # tab.set_cols_align(allign)
+# # return tab.draw()
+#
+#
+# # READ_IOPS_DISCSTAT_POS = 3
+# # WRITE_IOPS_DISCSTAT_POS = 7
+# #
+# #
+# # def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
+# # assert ftype == 'iops'
+# # pval = None
+# # with open(fname) as fd:
+# # iops = []
+# # for ln in fd:
+# # params = ln.split()
+# # cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
+# # int(params[READ_IOPS_DISCSTAT_POS])
+# # if pval is not None:
+# # iops.append(cval - pval)
+# # pval = cval
+# #
+# # vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
+# # return TimeSeriesValue(vals)
+# #
+# #
+# # def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
+# # res = {}
+# # params = None
+# #
+# # fn = os.path.join(folder, str(run_num) + '_params.yaml')
+# # params = yaml.load(open(fn).read())
+# #
+# # conn_ids_set = set()
+# # rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+# # for fname in os.listdir(folder):
+# # rm = re.match(rr, fname)
+# # if rm is None:
+# # continue
+# #
+# # conn_id_s = rm.group('conn_id')
+# # conn_id = conn_id_s.replace('_', ':')
+# # ftype = rm.group('type')
+# #
+# # if ftype not in ('iops', 'bw', 'lat'):
+# # continue
+# #
+# # ts = load_fio_log_file(os.path.join(folder, fname))
+# # res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
+# #
+# # conn_ids_set.add(conn_id)
+# #
+# # rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+# # for fname in os.listdir(folder):
+# # rm = re.match(rr, fname)
+# # if rm is None:
+# # continue
+# #
+# # conn_id_s = rm.group('conn_id')
+# # conn_id = conn_id_s.replace('_', ':')
+# # ftype = rm.group('type')
+# #
+# # if ftype not in ('iops', 'bw', 'lat'):
+# # continue
+# #
+# # ts = load_sys_log_file(ftype, os.path.join(folder, fname))
+# # res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
+# #
+# # conn_ids_set.add(conn_id)
+# #
+# # mm_res = {}
+# #
+# # if len(res) == 0:
+# # raise ValueError("No data was found")
+# #
+# # for key, data in res.items():
+# # conn_ids = sorted(conn_ids_set)
+# # awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
+# # matr = [data[conn_id] for conn_id in awail_ids]
+# # mm_res[key] = MeasurementMatrix(matr, awail_ids)
+# #
+# # raw_res = {}
+# # for conn_id in conn_ids:
+# # fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
+# #
+# # # remove message hack
+# # fc = "{" + open(fn).read().split('{', 1)[1]
+# # raw_res[conn_id] = json.loads(fc)
+# #
+# # fio_task = FioJobSection(params['name'])
+# # fio_task.vals.update(params['vals'])
+# #
+# # config = TestConfig('io', params, None, params['nodes'], folder, None)
+# # return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
+# #
+#
+# # class DiskPerfInfo:
+# # def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int) -> None:
+# # self.name = name
+# # self.bw = None
+# # self.iops = None
+# # self.lat = None
+# # self.lat_50 = None
+# # self.lat_95 = None
+# # self.lat_avg = None
+# #
+# # self.raw_bw = []
+# # self.raw_iops = []
+# # self.raw_lat = []
+# #
+# # self.params = params
+# # self.testnodes_count = testnodes_count
+# # self.summary = summary
+# #
+# # self.sync_mode = get_test_sync_mode(self.params['vals'])
+# # self.concurence = self.params['vals'].get('numjobs', 1)
+# #
+# #
+# # class IOTestResults:
+# # def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
+# # self.suite_name = suite_name
+# # self.fio_results = fio_results
+# # self.log_directory = log_directory
+# #
+# # def __iter__(self):
+# # return iter(self.fio_results)
+# #
+# # def __len__(self):
+# # return len(self.fio_results)
+# #
+# # def get_yamable(self) -> Dict[str, List[str]]:
+# # items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
+# # return {self.suite_name: [self.log_directory] + items}
+#
+#
+# # class FioRunResult(TestResults):
+# # """
+# # Fio run results
+# # config: TestConfig
+# # fio_task: FioJobSection
+# # ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
+# # raw_result: ????
+# # run_interval:(float, float) - test tun time, used for sensors
+# # """
+# # def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
+# #
+# # self.name = fio_task.name.rsplit("_", 1)[0]
+# # self.fio_task = fio_task
+# # self.idx = idx
+# #
+# # self.bw = ts_results['bw']
+# # self.lat = ts_results['lat']
+# # self.iops = ts_results['iops']
+# #
+# # if 'iops:sys' in ts_results:
+# # self.iops_sys = ts_results['iops:sys']
+# # else:
+# # self.iops_sys = None
+# #
+# # res = {"bw": self.bw,
+# # "lat": self.lat,
+# # "iops": self.iops,
+# # "iops:sys": self.iops_sys}
+# #
+# # self.sensors_data = None
+# # self._pinfo = None
+# # TestResults.__init__(self, config, res, raw_result, run_interval)
+# #
+# # def get_params_from_fio_report(self):
+# # nodes = self.bw.connections_ids
+# #
+# # iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
+# # total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
+# # runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
+# # flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
+# #
+# # bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
+# # total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
+# # flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
+# #
+# # return {'iops': iops,
+# # 'flt_iops': flt_iops,
+# # 'bw': bw,
+# # 'flt_bw': flt_bw}
+# #
+# # def summary(self):
+# # return get_test_summary(self.fio_task, len(self.config.nodes))
+# #
+# # def summary_tpl(self):
+# # return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
+# #
+# # def get_lat_perc_50_95_multy(self):
+# # lat_mks = collections.defaultdict(lambda: 0)
+# # num_res = 0
+# #
+# # for result in self.raw_result.values():
+# # num_res += len(result['jobs'])
+# # for job_info in result['jobs']:
+# # for k, v in job_info['latency_ms'].items():
+# # if isinstance(k, basestring) and k.startswith('>='):
+# # lat_mks[int(k[2:]) * 1000] += v
+# # else:
+# # lat_mks[int(k) * 1000] += v
+# #
+# # for k, v in job_info['latency_us'].items():
+# # lat_mks[int(k)] += v
+# #
+# # for k, v in lat_mks.items():
+# # lat_mks[k] = float(v) / num_res
+# # return get_lat_perc_50_95(lat_mks)
+# #
+# # def disk_perf_info(self, avg_interval=2.0):
+# #
+# # if self._pinfo is not None:
+# # return self._pinfo
+# #
+# # testnodes_count = len(self.config.nodes)
+# #
+# # pinfo = DiskPerfInfo(self.name,
+# # self.summary(),
+# # self.params,
+# # testnodes_count)
+# #
+# # def prepare(data, drop=1):
+# # if data is None:
+# # return data
+# #
+# # res = []
+# # for ts_data in data:
+# # if ts_data.average_interval() < avg_interval:
+# # ts_data = ts_data.derived(avg_interval)
+# #
+# # # drop last value on bounds
+# # # as they may contains ranges without activities
+# # assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
+# #
+# # if drop > 0:
+# # res.append(ts_data.values[:-drop])
+# # else:
+# # res.append(ts_data.values)
+# #
+# # return res
+# #
+# # def agg_data(matr):
+# # arr = sum(matr, [])
+# # min_len = min(map(len, arr))
+# # res = []
+# # for idx in range(min_len):
+# # res.append(sum(dt[idx] for dt in arr))
+# # return res
+# #
+# # pinfo.raw_lat = map(prepare, self.lat.per_vm())
+# # num_th = sum(map(len, pinfo.raw_lat))
+# # lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
+# # pinfo.lat_avg = data_property(lat_avg).average / 1000 # us to ms
+# #
+# # pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
+# # pinfo.lat = pinfo.lat_50
+# #
+# # pinfo.raw_bw = map(prepare, self.bw.per_vm())
+# # pinfo.raw_iops = map(prepare, self.iops.per_vm())
+# #
+# # if self.iops_sys is not None:
+# # pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
+# # pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
+# # else:
+# # pinfo.raw_iops_sys = None
+# # pinfo.iops_sys = None
+# #
+# # fparams = self.get_params_from_fio_report()
+# # fio_report_bw = sum(fparams['flt_bw'])
+# # fio_report_iops = sum(fparams['flt_iops'])
+# #
+# # agg_bw = agg_data(pinfo.raw_bw)
+# # agg_iops = agg_data(pinfo.raw_iops)
+# #
+# # log_bw_avg = average(agg_bw)
+# # log_iops_avg = average(agg_iops)
+# #
+# # # update values to match average from fio report
+# # coef_iops = fio_report_iops / float(log_iops_avg)
+# # coef_bw = fio_report_bw / float(log_bw_avg)
+# #
+# # bw_log = data_property([val * coef_bw for val in agg_bw])
+# # iops_log = data_property([val * coef_iops for val in agg_iops])
+# #
+# # bw_report = data_property([fio_report_bw])
+# # iops_report = data_property([fio_report_iops])
+# #
+# # # When IOPS/BW per thread is too low
+# # # data from logs is rounded to match
+# # iops_per_th = sum(sum(pinfo.raw_iops, []), [])
+# # if average(iops_per_th) > 10:
+# # pinfo.iops = iops_log
+# # pinfo.iops2 = iops_report
+# # else:
+# # pinfo.iops = iops_report
+# # pinfo.iops2 = iops_log
+# #
+# # bw_per_th = sum(sum(pinfo.raw_bw, []), [])
+# # if average(bw_per_th) > 10:
+# # pinfo.bw = bw_log
+# # pinfo.bw2 = bw_report
+# # else:
+# # pinfo.bw = bw_report
+# # pinfo.bw2 = bw_log
+# #
+# # self._pinfo = pinfo
+# #
+# # return pinfo
+#
+# # class TestResult:
+# # """Hold all information for a given test - test info,
+# # sensors data and performance results for test period from all nodes"""
+# # run_id = None # type: int
+# # test_info = None # type: Any
+# # begin_time = None # type: int
+# # end_time = None # type: int
+# # sensors = None # Dict[Tuple[str, str, str], TimeSeries]
+# # performance = None # Dict[Tuple[str, str], TimeSeries]
+# #
+# # class TestResults:
+# # """
+# # this class describe test results
+# #
+# # config:TestConfig - test config object
+# # params:dict - parameters from yaml file for this test
+# # results:{str:MeasurementMesh} - test results object
+# # raw_result:Any - opaque object to store raw results
+# # run_interval:(float, float) - test tun time, used for sensors
+# # """
+# #
+# # def __init__(self,
+# # config: TestConfig,
+# # results: Dict[str, Any],
+# # raw_result: Any,
+# # run_interval: Tuple[float, float]) -> None:
+# # self.config = config
+# # self.params = config.params
+# # self.results = results
+# # self.raw_result = raw_result
+# # self.run_interval = run_interval
+# #
+# # def __str__(self) -> str:
+# # res = "{0}({1}):\n results:\n".format(
+# # self.__class__.__name__,
+# # self.summary())
+# #
+# # for name, val in self.results.items():
+# # res += " {0}={1}\n".format(name, val)
+# #
+# # res += " params:\n"
+# #
+# # for name, val in self.params.items():
+# # res += " {0}={1}\n".format(name, val)
+# #
+# # return res
+# #
+# # def summary(self) -> str:
+# # raise NotImplementedError()
+# # return ""
+# #
+# # def get_yamable(self) -> Any:
+# # raise NotImplementedError()
+# # return None
+#
+#
+#
+# # class MeasurementMatrix:
+# # """
+# # data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+# # """
+# # def __init__(self, data, connections_ids):
+# # self.data = data
+# # self.connections_ids = connections_ids
+# #
+# # def per_vm(self):
+# # return self.data
+# #
+# # def per_th(self):
+# # return sum(self.data, [])
+#
+#
+# # class MeasurementResults:
+# # data = None # type: List[Any]
+# #
+# # def stat(self) -> StatProps:
+# # return data_property(self.data)
+# #
+# # def __str__(self) -> str:
+# # return 'TS([' + ", ".join(map(str, self.data)) + '])'
+# #
+# #
+# # class SimpleVals(MeasurementResults):
+# # """
+# # data:[float] - list of values
+# # """
+# # def __init__(self, data: List[float]) -> None:
+# # self.data = data
+# #
+# #
+# # class TimeSeriesValue(MeasurementResults):
+# # """
+# # data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+# # odata: original values
+# # """
+# # def __init__(self, data: List[Tuple[float, float]]) -> None:
+# # assert len(data) > 0
+# # self.odata = data[:]
+# # self.data = [] # type: List[Tuple[float, float, float]]
+# #
+# # cstart = 0.0
+# # for nstart, nval in data:
+# # self.data.append((cstart, nstart - cstart, nval))
+# # cstart = nstart
+# #
+# # @property
+# # def values(self) -> List[float]:
+# # return [val[2] for val in self.data]
+# #
+# # def average_interval(self) -> float:
+# # return float(sum([val[1] for val in self.data])) / len(self.data)
+# #
+# # def skip(self, seconds) -> 'TimeSeriesValue':
+# # nres = []
+# # for start, ln, val in self.data:
+# # nstart = start + ln - seconds
+# # if nstart > 0:
+# # nres.append([nstart, val])
+# # return self.__class__(nres)
+# #
+# # def derived(self, tdelta) -> 'TimeSeriesValue':
+# # end = self.data[-1][0] + self.data[-1][1]
+# # tdelta = float(tdelta)
+# #
+# # ln = end / tdelta
+# #
+# # if ln - int(ln) > 0:
+# # ln += 1
+# #
+# # res = [[tdelta * i, 0.0] for i in range(int(ln))]
+# #
+# # for start, lenght, val in self.data:
+# # start_idx = int(start / tdelta)
+# # end_idx = int((start + lenght) / tdelta)
+# #
+# # for idx in range(start_idx, end_idx + 1):
+# # rstart = tdelta * idx
+# # rend = tdelta * (idx + 1)
+# #
+# # intersection_ln = min(rend, start + lenght) - max(start, rstart)
+# # if intersection_ln > 0:
+# # try:
+# # res[idx][1] += val * intersection_ln / tdelta
+# # except IndexError:
+# # raise
+# #
+# # return self.__class__(res)
+#
+#
+# def console_report_stage(ctx: TestRun) -> None:
+# # TODO(koder): load data from storage
+# raise NotImplementedError("...")
+# # first_report = True
+# # text_rep_fname = ctx.config.text_report_file
+# #
+# # with open(text_rep_fname, "w") as fd:
+# # for tp, data in ctx.results.items():
+# # if 'io' == tp and data is not None:
+# # rep_lst = []
+# # for result in data:
+# # rep_lst.append(
+# # IOPerfTest.format_for_console(list(result)))
+# # rep = "\n\n".join(rep_lst)
+# # elif tp in ['mysql', 'pgbench'] and data is not None:
+# # rep = MysqlTest.format_for_console(data)
+# # elif tp == 'omg':
+# # rep = OmgTest.format_for_console(data)
+# # else:
+# # logger.warning("Can't generate text report for " + tp)
+# # continue
+# #
+# # fd.write(rep)
+# # fd.write("\n")
+# #
+# # if first_report:
+# # logger.info("Text report were stored in " + text_rep_fname)
+# # first_report = False
+# #
+# # print("\n" + rep + "\n")
+#
+#
+# # def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
+# # load_rep_fname = cfg.load_report_file
+# # found = False
+# # for idx, (tp, data) in enumerate(ctx.results.items()):
+# # if 'io' == tp and data is not None:
+# # if found:
+# # logger.error("Making reports for more than one " +
+# # "io block isn't supported! All " +
+# # "report, except first are skipped")
+# # continue
+# # found = True
+# # report.make_load_report(idx, cfg['results'], load_rep_fname)
+# #
+# #
+#
+# # def html_report_stage(ctx: TestRun) -> None:
+# # TODO(koder): load data from storage
+# # raise NotImplementedError("...")
+# # html_rep_fname = cfg.html_report_file
+# # found = False
+# # for tp, data in ctx.results.items():
+# # if 'io' == tp and data is not None:
+# # if found or len(data) > 1:
+# # logger.error("Making reports for more than one " +
+# # "io block isn't supported! All " +
+# # "report, except first are skipped")
+# # continue
+# # found = True
+# # report.make_io_report(list(data[0]),
+# # cfg.get('comment', ''),
+# # html_rep_fname,
+# # lab_info=ctx.nodes)
+#
+# #
+# # def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
+# # files = get_test_files(test_res_dir)
+# # raw_res = yaml_load(open(files['raw_results']).read())
+# # res = collections.defaultdict(list)
+# #
+# # for tp, test_lists in raw_res:
+# # for tests in test_lists:
+# # for suite_name, suite_data in tests.items():
+# # result_folder = suite_data[0]
+# # res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
+# #
+# # return res
+# #
+# #
+# # def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
+# # for tp, vals in load_data_from_path(var_dir).items():
+# # ctx.results.setdefault(tp, []).extend(vals)
+# #
+# #
+# # def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
+# # return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/result_classes.py b/wally/result_classes.py
new file mode 100644
index 0000000..9b488b7
--- /dev/null
+++ b/wally/result_classes.py
@@ -0,0 +1,54 @@
+from typing import Union, Dict, List, Any, Tuple
+
+# Stores test result for integral value, which
+# can be expressed as a single value for given time period,
+# like IO, BW, etc.
+TimeSeriesIntegral = List[float]
+
+
+# Stores test result for value, which
+# requires distribution to be stored for any time period,
+# like latency.
+TimeSeriesHistogram = List[List[float]]
+
+
+TimeSeries = Union[TimeSeriesIntegral, TimeSeriesHistogram]
+RawTestResults = Dict[str, TimeSeries]
+
+
+class SensorInfo:
+ """Holds information from a single sensor from a single node"""
+ node_id = None # type: str
+ source_id = None # type: str
+ sensor_name = None # type: str
+ begin_time = None # type: int
+ end_time = None # type: int
+ data = None # type: TimeSeries
+
+ def __init__(self, node_id: str, source_id: str, sensor_name: str) -> None:
+ self.node_id = node_id
+ self.source_id = source_id
+ self.sensor_name = sensor_name
+
+
+class TestInfo:
+ """Contains done test information"""
+ name = None # type: str
+ iteration_name = None # type: str
+ nodes = None # type: List[str]
+ start_time = None # type: int
+ stop_time = None # type: int
+ params = None # type: Dict[str, Any]
+ config = None # type: str
+ node_ids = None # type: List[str]
+
+
+class FullTestResult:
+ test_info = None # type: TestInfo
+
+ # TODO(koder): array.array or numpy.array?
+ # {(node_id, perf_metrics_name): values}
+ performance_data = None # type: Dict[Tuple[str, str], TimeSeries]
+
+ # {(node_id, perf_metrics_name): values}
+ sensors_data = None # type: Dict[Tuple[str, str, str], SensorInfo]
diff --git a/wally/run_test.py b/wally/run_test.py
index d8ef685..9ae2c9e 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -12,7 +12,7 @@
from .config import ConfigBlock, Config
from .suits.mysql import MysqlTest
-from .suits.itest import TestConfig
+from .suits.itest import TestInputConfig
from .suits.io.fio import IOPerfTest
from .suits.postgres import PgBenchTest
from .suits.omgbench import OmgTest
@@ -162,12 +162,12 @@
with suspend_ctx:
test_cls = TOOL_TYPE_MAPPER[name]
remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
- test_cfg = TestConfig(test_cls.__name__,
- params=params,
- run_uuid=ctx.config.run_uuid,
- nodes=test_nodes,
- storage=ctx.storage,
- remote_dir=remote_dir)
+ test_cfg = TestInputConfig(test_cls.__name__,
+ params=params,
+ run_uuid=ctx.config.run_uuid,
+ nodes=test_nodes,
+ storage=ctx.storage,
+ remote_dir=remote_dir)
test_cls(test_cfg).run()
@@ -192,7 +192,8 @@
else:
discover_objs = [i.strip() for i in discover_info.strip().split(",")]
- ctx.fuel_openstack_creds, nodes = discover.discover(discover_objs,
+ ctx.fuel_openstack_creds, nodes = discover.discover(ctx,
+ discover_objs,
ctx.config.clouds,
not ctx.config.dont_discover_nodes)
@@ -407,6 +408,28 @@
node.disconnect()
+def clouds_connect_stage(ctx: TestRun) -> None:
+ # TODO(koder): need to use this to connect to openstack in upper code
+ # conn = ctx.config['clouds/openstack']
+ # user, passwd, tenant = parse_creds(conn['creds'])
+ # auth_data = dict(auth_url=conn['auth_url'],
+ # username=user,
+ # api_key=passwd,
+ # project_id=tenant) # type: Dict[str, str]
+ # logger.debug("Discovering openstack nodes with connection details: %r", conn)
+ # connect to openstack, fuel
+
+ # # parse FUEL REST credentials
+ # username, tenant_name, password = parse_creds(fuel_data['creds'])
+ # creds = {"username": username,
+ # "tenant_name": tenant_name,
+ # "password": password}
+ #
+ # # connect to FUEL
+ # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
+ pass
+
+
def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
if nodes_ids:
logger.info("Removing nodes")
@@ -430,87 +453,7 @@
def console_report_stage(ctx: TestRun) -> None:
# TODO(koder): load data from storage
raise NotImplementedError("...")
- # first_report = True
- # text_rep_fname = ctx.config.text_report_file
- #
- # with open(text_rep_fname, "w") as fd:
- # for tp, data in ctx.results.items():
- # if 'io' == tp and data is not None:
- # rep_lst = []
- # for result in data:
- # rep_lst.append(
- # IOPerfTest.format_for_console(list(result)))
- # rep = "\n\n".join(rep_lst)
- # elif tp in ['mysql', 'pgbench'] and data is not None:
- # rep = MysqlTest.format_for_console(data)
- # elif tp == 'omg':
- # rep = OmgTest.format_for_console(data)
- # else:
- # logger.warning("Can't generate text report for " + tp)
- # continue
- #
- # fd.write(rep)
- # fd.write("\n")
- #
- # if first_report:
- # logger.info("Text report were stored in " + text_rep_fname)
- # first_report = False
- #
- # print("\n" + rep + "\n")
-
-
-# def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
-# load_rep_fname = cfg.load_report_file
-# found = False
-# for idx, (tp, data) in enumerate(ctx.results.items()):
-# if 'io' == tp and data is not None:
-# if found:
-# logger.error("Making reports for more than one " +
-# "io block isn't supported! All " +
-# "report, except first are skipped")
-# continue
-# found = True
-# report.make_load_report(idx, cfg['results'], load_rep_fname)
-#
-#
def html_report_stage(ctx: TestRun) -> None:
# TODO(koder): load data from storage
raise NotImplementedError("...")
- # html_rep_fname = cfg.html_report_file
- # found = False
- # for tp, data in ctx.results.items():
- # if 'io' == tp and data is not None:
- # if found or len(data) > 1:
- # logger.error("Making reports for more than one " +
- # "io block isn't supported! All " +
- # "report, except first are skipped")
- # continue
- # found = True
- # report.make_io_report(list(data[0]),
- # cfg.get('comment', ''),
- # html_rep_fname,
- # lab_info=ctx.nodes)
-
-#
-# def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
-# files = get_test_files(test_res_dir)
-# raw_res = yaml_load(open(files['raw_results']).read())
-# res = collections.defaultdict(list)
-#
-# for tp, test_lists in raw_res:
-# for tests in test_lists:
-# for suite_name, suite_data in tests.items():
-# result_folder = suite_data[0]
-# res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
-#
-# return res
-#
-#
-# def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
-# for tp, vals in load_data_from_path(var_dir).items():
-# ctx.results.setdefault(tp, []).extend(vals)
-#
-#
-# def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
-# return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/sensors.py b/wally/sensors.py
new file mode 100644
index 0000000..b579f3f
--- /dev/null
+++ b/wally/sensors.py
@@ -0,0 +1,90 @@
+from typing import List, Dict, Tuple
+from .test_run_class import TestRun
+from . import sensors_rpc_plugin
+
+
+plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
+SENSORS_PLUGIN_CODE = open(plugin_fname).read()
+
+
+# TODO(koder): in case if node has more than one role sensor settigns might be incorrect
+def start_sensors_stage(ctx: TestRun) -> None:
+ if 'sensors' not in ctx.config:
+ return
+
+ per_role_config = {}
+ for name, val in ctx.config['sensors'].copy():
+ if isinstance(val, str):
+ val = {vl.strip(): ".*" for vl in val.split(",")}
+ elif isinstance(val, list):
+ val = {vl: ".*" for vl in val}
+ per_role_config[name] = val
+
+ if 'all' in per_role_config:
+ all_vl = per_role_config.pop('all')
+ all_roles = set(per_role_config)
+
+ for node in ctx.nodes:
+ all_roles.update(node.info.roles)
+
+ for name, vals in list(per_role_config.items()):
+ new_vals = all_roles.copy()
+ new_vals.update(vals)
+ per_role_config[name] = new_vals
+
+ for node in ctx.nodes:
+ node_cfg = {}
+ for role in node.info.roles:
+ node_cfg.update(per_role_config.get(role, {}))
+
+ if node_cfg:
+ node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
+ ctx.sensors_run_on.add(node.info.node_id())
+ node.conn.sensors.start()
+
+
+def collect_sensors_stage(ctx: TestRun, stop: bool = True) -> None:
+ for node in ctx.nodes:
+ node_id = node.info.node_id()
+ if node_id in ctx.sensors_run_on:
+
+ if stop:
+ data, collected_at = node.conn.sensors.stop() # type: Dict[Tuple[str, str], List[int]], List[float]
+ else:
+ data, collected_at = node.conn.sensors.get_updates()
+
+ for (source_name, sensor_name), values in data.items():
+ path = "metric/{}/{}/{}".format(node_id, source_name, sensor_name)
+ ctx.storage.append(path, values)
+ ctx.storage.append("metric/{}/collected_at".format(node_id), collected_at)
+
+
+# def delta(func, only_upd=True):
+# prev = {}
+# while True:
+# for dev_name, vals in func():
+# if dev_name not in prev:
+# prev[dev_name] = {}
+# for name, (val, _) in vals.items():
+# prev[dev_name][name] = val
+# else:
+# dev_prev = prev[dev_name]
+# res = {}
+# for stat_name, (val, accum_val) in vals.items():
+# if accum_val:
+# if stat_name in dev_prev:
+# delta = int(val) - int(dev_prev[stat_name])
+# if not only_upd or 0 != delta:
+# res[stat_name] = str(delta)
+# dev_prev[stat_name] = val
+# elif not only_upd or '0' != val:
+# res[stat_name] = val
+#
+# if only_upd and len(res) == 0:
+# continue
+# yield dev_name, res
+# yield None, None
+#
+#
+
+
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index a2758c3..c218bff 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,12 +1,19 @@
import os
-from collections import namedtuple
-
-SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
-# SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
+import time
+import array
+import threading
-def provides(name: str):
+mod_name = "sensor"
+__version__ = (0, 1)
+
+
+SensorsMap = {}
+
+
+def provides(name):
def closure(func):
+ SensorsMap[name] = func
return func
return closure
@@ -28,16 +35,16 @@
def get_pid_list(disallowed_prefixes, allowed_prefixes):
"""Return pid list from list of pids and names"""
# exceptions
- but = disallowed_prefixes if disallowed_prefixes is not None else []
+ disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
if allowed_prefixes is None:
# if nothing setted - all ps will be returned except setted
result = [pid
for pid in os.listdir('/proc')
- if pid.isdigit() and pid not in but]
+ if pid.isdigit() and pid not in disallowed]
else:
result = []
for pid in os.listdir('/proc'):
- if pid.isdigit() and pid not in but:
+ if pid.isdigit() and pid not in disallowed:
name = get_pid_name(pid)
if pid in allowed_prefixes or \
any(name.startswith(val) for val in allowed_prefixes):
@@ -61,33 +68,6 @@
return "no_such_process"
-def delta(func, only_upd=True):
- prev = {}
- while True:
- for dev_name, vals in func():
- if dev_name not in prev:
- prev[dev_name] = {}
- for name, (val, _) in vals.items():
- prev[dev_name][name] = val
- else:
- dev_prev = prev[dev_name]
- res = {}
- for stat_name, (val, accum_val) in vals.items():
- if accum_val:
- if stat_name in dev_prev:
- delta = int(val) - int(dev_prev[stat_name])
- if not only_upd or 0 != delta:
- res[stat_name] = str(delta)
- dev_prev[stat_name] = val
- elif not only_upd or '0' != val:
- res[stat_name] = val
-
- if only_upd and len(res) == 0:
- continue
- yield dev_name, res
- yield None, None
-
-
# 1 - major number
# 2 - minor mumber
# 3 - device name
@@ -121,43 +101,14 @@
for line in open('/proc/diskstats'):
vals = line.split()
dev_name = vals[2]
-
dev_ok = is_dev_accepted(dev_name,
disallowed_prefixes,
allowed_prefixes)
- if dev_name[-1].isdigit():
- dev_ok = False
+ if not dev_ok or dev_name[-1].isdigit():
+ continue
- if dev_ok:
- for pos, name, accum_val in io_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
- return results
-
-
-def get_latency(stat1, stat2):
- disks = set(i.split('.', 1)[0] for i in stat1)
- results = {}
-
- for disk in disks:
- rdc = disk + '.reads_completed'
- wrc = disk + '.writes_completed'
- rdt = disk + '.rtime'
- wrt = disk + '.wtime'
- lat = 0.0
-
- io_ops1 = stat1[rdc].value + stat1[wrc].value
- io_ops2 = stat2[rdc].value + stat2[wrc].value
-
- diops = io_ops2 - io_ops1
-
- if diops != 0:
- io1 = stat1[rdt].value + stat1[wrt].value
- io2 = stat2[rdt].value + stat2[wrt].value
- lat = abs(float(io1 - io2)) / diops
-
- results[disk + '.latence'] = SensorInfo(lat, False)
-
+ for pos, name, _ in io_values_pos:
+ results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
return results
@@ -201,28 +152,8 @@
dev_ok = False
if dev_ok:
- for pos, name, accum_val in net_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
- return results
-
-
-@provides("perprocess-cpu")
-def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-
- for pid in pid_list:
- try:
- dev_name = get_pid_name(pid)
-
- pid_stat1 = pid_stat(pid)
-
- sensor_name = "{0}.{1}".format(dev_name, pid)
- results[sensor_name] = SensorInfo(pid_stat1, True)
- except IOError:
- # may be, proc has already terminated, skip it
- continue
+ for pos, name, _ in net_values_pos:
+ results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
return results
@@ -239,14 +170,19 @@
return float(int(utime) + int(stime))
-# Based on ps_mem.py:
-# Licence: LGPLv2
-# Author: P@draigBrady.com
-# Source: http://www.pixelbeat.org/scripts/ps_mem.py
-# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ # TODO(koder): fixed list of PID's nust be given
+ for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
+ try:
+ results["{0}.{1}".format(get_pid_name(pid), pid)] = pid_stat(pid)
+ except IOError:
+ # may be, proc has already terminated, skip it
+ continue
+ return results
-# Note shared is always a subset of rss (trs is not always)
def get_mem_stats(pid):
"""Return memory data of pid in format (private, shared)"""
@@ -257,7 +193,7 @@
private = 0
pss = 0
- # add 0.5KiB as this avg error due to trunctation
+ # add 0.5KiB as this avg error due to truncation
pss_adjust = 0.5
for line in lines:
@@ -279,39 +215,38 @@
return (private, shared)
+def get_ram_size():
+ """Return RAM size in Kb"""
+ with open("/proc/meminfo") as proc:
+ mem_total = proc.readline().split()
+ return int(mem_total[1])
+
+
@provides("perprocess-ram")
def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
- for pid in pid_list:
+ # TODO(koder): fixed list of PID's nust be given
+ for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
try:
dev_name = get_pid_name(pid)
private, shared = get_mem_stats(pid)
total = private + shared
sys_total = get_ram_size()
- usage = float(total) / float(sys_total)
+ usage = float(total) / sys_total
sensor_name = "{0}({1})".format(dev_name, pid)
- results[sensor_name + ".private_mem"] = SensorInfo(private, False)
- results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
- results[sensor_name + ".used_mem"] = SensorInfo(total, False)
- name = sensor_name + ".mem_usage_percent"
- results[name] = SensorInfo(usage * 100, False)
+ results.update([
+ (sensor_name + ".private_mem", private),
+ (sensor_name + ".shared_mem", shared),
+ (sensor_name + ".used_mem", total),
+ (sensor_name + ".mem_usage_percent", int(usage * 100))])
except IOError:
# permission denied or proc die
continue
return results
-
-def get_ram_size():
- """Return RAM size in Kb"""
- with open("/proc/meminfo") as proc:
- mem_total = proc.readline().split()
- return mem_total[1]
-
-
# 0 - cpu name
# 1 - user: normal processes executing in user mode
# 2 - nice: niced processes executing in user mode
@@ -341,13 +276,12 @@
dev_name = vals[0]
if dev_name == 'cpu':
- for pos, name, accum_val in cpu_values_pos:
+ for pos, name, _ in cpu_values_pos:
sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]),
- accum_val)
+ results[sensor_name] = int(vals[pos])
elif dev_name == 'procs_blocked':
val = int(vals[1])
- results["cpu.procs_blocked"] = SensorInfo(val, False)
+ results["cpu.procs_blocked"] = val
elif dev_name.startswith('cpu'):
core_count += 1
@@ -355,9 +289,10 @@
TASKSPOS = 3
vals = open('/proc/loadavg').read().split()
ready_procs = vals[TASKSPOS].partition('/')[0]
+
# dec on current proc
procs_queue = (float(ready_procs) - 1) / core_count
- results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
+ results["cpu.procs_queue"] = procs_queue
return results
@@ -380,7 +315,9 @@
def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
if allowed_prefixes is None:
allowed_prefixes = ram_fields
+
results = {}
+
for line in open('/proc/meminfo'):
vals = line.split()
dev_name = vals[0].rstrip(":")
@@ -392,10 +329,112 @@
title = "ram.{0}".format(dev_name)
if dev_ok:
- results[title] = SensorInfo(int(vals[1]), False)
+ results[title] = int(vals[1])
if 'ram.MemFree' in results and 'ram.MemTotal' in results:
used = results['ram.MemTotal'].value - results['ram.MemFree'].value
- usage = float(used) / results['ram.MemTotal'].value
- results["ram.usage_percent"] = SensorInfo(usage, False)
+ results["ram.usage_percent"] = int(float(used) / results['ram.MemTotal'].value)
+
return results
+
+
+class SensorsData(object):
+ def __init__(self):
+ self.cond = threading.Condition()
+ self.collected_at = array.array("f")
+ self.stop = False
+ self.data = {} # map sensor_name to list of results
+ self.data_fd = None
+
+
+# TODO(koder): a lot code here can be optimized and cached, but nobody cares (c)
+def sensors_bg_thread(sensors_config, sdata):
+ next_collect_at = time.time()
+
+ while not sdata.stop:
+ dtime = next_collect_at - time.time()
+ if dtime > 0:
+ sdata.cond.wait(dtime)
+
+ if sdata.stop:
+ break
+
+ ctm = time.time()
+ curr = {}
+ for name, config in sensors_config.items():
+ params = {}
+
+ if "allow" in config:
+ params["allowed_prefixes"] = config["allow"]
+
+ if "disallow" in config:
+ params["disallowed_prefixes"] = config["disallow"]
+
+ curr[name] = SensorsMap[name](**params)
+
+ etm = time.time()
+
+ if etm - ctm > 0.1:
+ # TODO(koder): need to signal that something in not really ok with sensor collecting
+ pass
+
+ with sdata.cond:
+ sdata.collected_at.append(ctm)
+ for source_name, vals in curr.items():
+ for sensor_name, val in vals.items():
+ key = (source_name, sensor_name)
+ if key not in sdata.data:
+ sdata.data[key] = array.array("I", [val])
+ else:
+ sdata.data[key].append(val)
+
+
+sensors_thread = None
+sdata = None # type: SensorsData
+
+
+def rpc_start(sensors_config):
+ global sensors_thread
+ global sdata
+
+ if sensors_thread is not None:
+ raise ValueError("Thread already running")
+
+ sdata = SensorsData()
+ sensors_thread = threading.Thread(target=sensors_bg_thread, args=(sensors_config, sdata))
+ sensors_thread.daemon = True
+ sensors_thread.start()
+
+
+def rpc_get_updates():
+ if sdata is None:
+ raise ValueError("No sensor thread running")
+
+ with sdata.cond:
+ res = sdata.data
+ collected_at = sdata.collected_at
+ sdata.collected_at = array.array("f")
+ sdata.data = {name: array.array("I") for name in sdata.data}
+
+ return res, collected_at
+
+
+def rpc_stop():
+ global sensors_thread
+ global sdata
+
+ if sensors_thread is None:
+ raise ValueError("No sensor thread running")
+
+ sdata.stop = True
+ with sdata.cond:
+ sdata.cond.notify_all()
+
+ sensors_thread.join()
+ res = sdata.data
+ collected_at = sdata.collected_at
+
+ sensors_thread = None
+ sdata = None
+
+ return res, collected_at
diff --git a/wally/sensors_rpc_plugin.pyi b/wally/sensors_rpc_plugin.pyi
index c4b387b..21fe1d5 100644
--- a/wally/sensors_rpc_plugin.pyi
+++ b/wally/sensors_rpc_plugin.pyi
@@ -1,24 +1,22 @@
-import os
from typing import NamedTuple, TypeVar, Callable, Any, Optional, List, Iterable, Dict, Tuple
-SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
Pid = TypeVar('Pid', str)
AnyFunc = TypeVar('AnyFunc', Callable[..., Any])
PrefixList = Optional[List[str]]
-SensorDict = Dict[str, SensorInfo]
+SensorDict = Dict[str, int]
def provides(name: str) -> Callable[[AnyFunc], AnyFunc]: ...
-def is_dev_accepted(name, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
+def is_dev_accepted(name: str, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
def get_pid_name(pid: Pid) -> str: ...
-def delta(func, only_upd: bool = True) -> Iterable[Optional[str], Optional[Dict[str, str]]]: ...
-def get_latency(stat1: SensorDict, stat2: SensorDict) -> SensorDict: ...
def pid_stat(pid: Pid) -> float: ...
def get_mem_stats(pid : Pid) -> Tuple[int, int]: ...
-def get_ram_size() -> str: ...
+def get_ram_size() -> int: ...
-def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+
+
diff --git a/wally/stage.py b/wally/stage.py
index e06e31d..0451d18 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,6 +1,6 @@
import logging
import contextlib
-from typing import Callable
+from typing import Callable, Iterator
from .utils import StopTestError
from .test_run_class import TestRun
@@ -10,7 +10,7 @@
@contextlib.contextmanager
-def log_stage(stage) -> None:
+def log_stage(stage) -> Iterator[None]:
msg_templ = "Exception during {0}: {1!s}"
msg_templ_no_exc = "During {0}"
diff --git a/wally/storage.py b/wally/storage.py
index 349995f..05e4259 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -141,8 +141,8 @@
def __contains__(self, path: str) -> bool:
return path in self.storage
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
- return self.storage.list(path)
+ def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+ return self.storage.list("/".join(path))
def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
if obj_class in (int, str, dict, list, None):
@@ -162,16 +162,18 @@
obj.__dict__.update(raw_val)
return obj
- def load_list(self, path: str, obj_class: Type[ObjClass]) -> List[ObjClass]:
- raw_val = self[path]
+ def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
+ path_s = "/".join(path)
+ raw_val = self[path_s]
assert isinstance(raw_val, list)
- return [self.construct(path, val, obj_class) for val in cast(list, raw_val)]
+ return [self.construct(path_s, val, obj_class) for val in cast(list, raw_val)]
- def load(self, path: str, obj_class: Type[ObjClass]) -> ObjClass:
- return self.construct(path, self[path], obj_class)
+ def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
+ path_s = "/".join(path)
+ return self.construct(path_s, self[path_s], obj_class)
- def get_stream(self, path: str) -> IO:
- return self.storage.get_stream(path)
+ def get_stream(self, *path: str) -> IO:
+ return self.storage.get_stream("/".join(path))
def get(self, path: str, default: Any = None) -> Any:
try:
@@ -179,6 +181,9 @@
except KeyError:
return default
+ def append(self, path: str, data: List):
+ raise NotImplemented()
+
def make_storage(url: str, existing: bool = False) -> Storage:
return Storage(FSStorage(url, existing), YAMLSerializer())
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 8ba89c1..583a4a0 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -5,7 +5,17 @@
discovered_nodes: List[NodeInfo] - list of discovered nodes
reused_nodes: List[NodeInfo] - list of reused nodes from cluster
spawned_vm_ids: List[int] - list of openstack VM id's, spawned for test
-__types__ = type of data in keys
info/comment : str - run comment
info/run_uuid : str - run uuid
-info/run_time : float - run unix time
\ No newline at end of file
+info/run_time : float - run unix time
+
+# test results
+result/{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
+result/{id}/measurement/{node}/{name} : List[float] - measurements data. E.g.:
+ result/{id}/measurement/node-12/iops - for BW uses iops * block_sz
+ result/{id}/measurement/node-12/lat_histo
+
+metric/{node_name}/{dev}/{metric_name} : List[float] - node metrics data. E.g.:
+ metric/node-22/cpu/load
+ metric/node-22/sda/read_io
+ metric/node-22/eth0/data_recv
\ No newline at end of file
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 2360a55..0f4ebde 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,970 +1,128 @@
-import re
-import time
-import json
-import stat
-import random
-import hashlib
import os.path
import logging
-import datetime
-import functools
-import collections
-from typing import Dict, List, Callable, Any, Tuple, Optional
-
-import yaml
-import texttable
-from paramiko.ssh_exception import SSHException
-from concurrent.futures import ThreadPoolExecutor, wait
+from typing import Dict, List, Union, cast
import wally
-from ...pretty_yaml import dumps
-from ...statistic import round_3_digit, data_property, average
-from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from ...inode import INode
-
-from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
-
-from .fio_task_parser import (execution_time, fio_cfg_compile,
- get_test_summary, get_test_summary_tuple,
- get_test_sync_mode, FioJobSection)
-from .rpc_plugin import parse_fio_result
+from ...utils import ssize2b, StopTestError, get_os
+from ...node_interfaces import IRPCNode
+from ..itest import ThreadedTest, IterationConfig, RunTestRes
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
logger = logging.getLogger("wally")
-class NoData:
- pass
-
-
-def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
- @property
- @functools.wraps(func)
- def closure(self) -> Any:
- val = getattr(self, "_" + func.__name__)
- if val is NoData:
- val = func(self)
- setattr(self, "_" + func.__name__, val)
- return val
- return closure
-
-
-def load_fio_log_file(fname: str) -> TimeSeriesValue:
- with open(fname) as fd:
- it = [ln.split(',')[:2] for ln in fd]
-
- vals = [(float(off) / 1000, # convert us to ms
- float(val.strip()) + 0.5) # add 0.5 to compemsate average value
- # as fio trimm all values in log to integer
- for off, val in it]
-
- return TimeSeriesValue(vals)
-
-
-READ_IOPS_DISCSTAT_POS = 3
-WRITE_IOPS_DISCSTAT_POS = 7
-
-
-def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
- assert ftype == 'iops'
- pval = None
- with open(fname) as fd:
- iops = []
- for ln in fd:
- params = ln.split()
- cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
- int(params[READ_IOPS_DISCSTAT_POS])
- if pval is not None:
- iops.append(cval - pval)
- pval = cval
-
- vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
- return TimeSeriesValue(vals)
-
-
-def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
- res = {}
- params = None
-
- fn = os.path.join(folder, str(run_num) + '_params.yaml')
- params = yaml.load(open(fn).read())
-
- conn_ids_set = set()
- rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
- for fname in os.listdir(folder):
- rm = re.match(rr, fname)
- if rm is None:
- continue
-
- conn_id_s = rm.group('conn_id')
- conn_id = conn_id_s.replace('_', ':')
- ftype = rm.group('type')
-
- if ftype not in ('iops', 'bw', 'lat'):
- continue
-
- ts = load_fio_log_file(os.path.join(folder, fname))
- res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
-
- conn_ids_set.add(conn_id)
-
- rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
- for fname in os.listdir(folder):
- rm = re.match(rr, fname)
- if rm is None:
- continue
-
- conn_id_s = rm.group('conn_id')
- conn_id = conn_id_s.replace('_', ':')
- ftype = rm.group('type')
-
- if ftype not in ('iops', 'bw', 'lat'):
- continue
-
- ts = load_sys_log_file(ftype, os.path.join(folder, fname))
- res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
-
- conn_ids_set.add(conn_id)
-
- mm_res = {}
-
- if len(res) == 0:
- raise ValueError("No data was found")
-
- for key, data in res.items():
- conn_ids = sorted(conn_ids_set)
- awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
- matr = [data[conn_id] for conn_id in awail_ids]
- mm_res[key] = MeasurementMatrix(matr, awail_ids)
-
- raw_res = {}
- for conn_id in conn_ids:
- fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
-
- # remove message hack
- fc = "{" + open(fn).read().split('{', 1)[1]
- raw_res[conn_id] = json.loads(fc)
-
- fio_task = FioJobSection(params['name'])
- fio_task.vals.update(params['vals'])
-
- config = TestConfig('io', params, None, params['nodes'], folder, None)
- return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-
-
-class Attrmapper:
- def __init__(self, dct: Dict[str, Any]):
- self.__dct = dct
-
- def __getattr__(self, name):
- try:
- return self.__dct[name]
- except KeyError:
- raise AttributeError(name)
-
-
-class DiskPerfInfo:
- def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
- self.name = name
- self.bw = None
- self.iops = None
- self.lat = None
- self.lat_50 = None
- self.lat_95 = None
- self.lat_avg = None
-
- self.raw_bw = []
- self.raw_iops = []
- self.raw_lat = []
-
- self.params = params
- self.testnodes_count = testnodes_count
- self.summary = summary
- self.p = Attrmapper(self.params['vals'])
-
- self.sync_mode = get_test_sync_mode(self.params['vals'])
- self.concurence = self.params['vals'].get('numjobs', 1)
-
-
-def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
- curr_perc = 0
- perc_50 = None
- perc_95 = None
- pkey = None
- for key, val in sorted(lat_mks.items()):
- if curr_perc + val >= 50 and perc_50 is None:
- if pkey is None or val < 1.:
- perc_50 = key
- else:
- perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
-
- if curr_perc + val >= 95:
- if pkey is None or val < 1.:
- perc_95 = key
- else:
- perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
- break
-
- pkey = key
- curr_perc += val
-
- # for k, v in sorted(lat_mks.items()):
- # if k / 1000 > 0:
- # print "{0:>4}".format(k / 1000), v
-
- # print perc_50 / 1000., perc_95 / 1000.
- # exit(1)
- return perc_50 / 1000., perc_95 / 1000.
-
-
-class IOTestResults:
- def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
- self.suite_name = suite_name
- self.fio_results = fio_results
- self.log_directory = log_directory
-
- def __iter__(self):
- return iter(self.fio_results)
-
- def __len__(self):
- return len(self.fio_results)
-
- def get_yamable(self) -> Dict[str, List[str]]:
- items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
- return {self.suite_name: [self.log_directory] + items}
-
-
-class FioRunResult(TestResults):
- """
- Fio run results
- config: TestConfig
- fio_task: FioJobSection
- ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
- raw_result: ????
- run_interval:(float, float) - test tun time, used for sensors
- """
- def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
-
- self.name = fio_task.name.rsplit("_", 1)[0]
- self.fio_task = fio_task
- self.idx = idx
-
- self.bw = ts_results['bw']
- self.lat = ts_results['lat']
- self.iops = ts_results['iops']
-
- if 'iops:sys' in ts_results:
- self.iops_sys = ts_results['iops:sys']
- else:
- self.iops_sys = None
-
- res = {"bw": self.bw,
- "lat": self.lat,
- "iops": self.iops,
- "iops:sys": self.iops_sys}
-
- self.sensors_data = None
- self._pinfo = None
- TestResults.__init__(self, config, res, raw_result, run_interval)
-
- def get_params_from_fio_report(self):
- nodes = self.bw.connections_ids
-
- iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
- total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
- runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
- flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
-
- bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
- total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
- flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
-
- return {'iops': iops,
- 'flt_iops': flt_iops,
- 'bw': bw,
- 'flt_bw': flt_bw}
-
- def summary(self):
- return get_test_summary(self.fio_task, len(self.config.nodes))
-
- def summary_tpl(self):
- return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
-
- def get_lat_perc_50_95_multy(self):
- lat_mks = collections.defaultdict(lambda: 0)
- num_res = 0
-
- for result in self.raw_result.values():
- num_res += len(result['jobs'])
- for job_info in result['jobs']:
- for k, v in job_info['latency_ms'].items():
- if isinstance(k, basestring) and k.startswith('>='):
- lat_mks[int(k[2:]) * 1000] += v
- else:
- lat_mks[int(k) * 1000] += v
-
- for k, v in job_info['latency_us'].items():
- lat_mks[int(k)] += v
-
- for k, v in lat_mks.items():
- lat_mks[k] = float(v) / num_res
- return get_lat_perc_50_95(lat_mks)
-
- def disk_perf_info(self, avg_interval=2.0):
-
- if self._pinfo is not None:
- return self._pinfo
-
- testnodes_count = len(self.config.nodes)
-
- pinfo = DiskPerfInfo(self.name,
- self.summary(),
- self.params,
- testnodes_count)
-
- def prepare(data, drop=1):
- if data is None:
- return data
-
- res = []
- for ts_data in data:
- if ts_data.average_interval() < avg_interval:
- ts_data = ts_data.derived(avg_interval)
-
- # drop last value on bounds
- # as they may contains ranges without activities
- assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
-
- if drop > 0:
- res.append(ts_data.values[:-drop])
- else:
- res.append(ts_data.values)
-
- return res
-
- def agg_data(matr):
- arr = sum(matr, [])
- min_len = min(map(len, arr))
- res = []
- for idx in range(min_len):
- res.append(sum(dt[idx] for dt in arr))
- return res
-
- pinfo.raw_lat = map(prepare, self.lat.per_vm())
- num_th = sum(map(len, pinfo.raw_lat))
- lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
- pinfo.lat_avg = data_property(lat_avg).average / 1000 # us to ms
-
- pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
- pinfo.lat = pinfo.lat_50
-
- pinfo.raw_bw = map(prepare, self.bw.per_vm())
- pinfo.raw_iops = map(prepare, self.iops.per_vm())
-
- if self.iops_sys is not None:
- pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
- pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
- else:
- pinfo.raw_iops_sys = None
- pinfo.iops_sys = None
-
- fparams = self.get_params_from_fio_report()
- fio_report_bw = sum(fparams['flt_bw'])
- fio_report_iops = sum(fparams['flt_iops'])
-
- agg_bw = agg_data(pinfo.raw_bw)
- agg_iops = agg_data(pinfo.raw_iops)
-
- log_bw_avg = average(agg_bw)
- log_iops_avg = average(agg_iops)
-
- # update values to match average from fio report
- coef_iops = fio_report_iops / float(log_iops_avg)
- coef_bw = fio_report_bw / float(log_bw_avg)
-
- bw_log = data_property([val * coef_bw for val in agg_bw])
- iops_log = data_property([val * coef_iops for val in agg_iops])
-
- bw_report = data_property([fio_report_bw])
- iops_report = data_property([fio_report_iops])
-
- # When IOPS/BW per thread is too low
- # data from logs is rounded to match
- iops_per_th = sum(sum(pinfo.raw_iops, []), [])
- if average(iops_per_th) > 10:
- pinfo.iops = iops_log
- pinfo.iops2 = iops_report
- else:
- pinfo.iops = iops_report
- pinfo.iops2 = iops_log
-
- bw_per_th = sum(sum(pinfo.raw_bw, []), [])
- if average(bw_per_th) > 10:
- pinfo.bw = bw_log
- pinfo.bw2 = bw_report
- else:
- pinfo.bw = bw_report
- pinfo.bw2 = bw_log
-
- self._pinfo = pinfo
-
- return pinfo
-
-
-class IOPerfTest(PerfTest):
- tcp_conn_timeout = 30
- max_pig_timeout = 5
+class IOPerfTest(ThreadedTest):
soft_runcycle = 5 * 60
retry_time = 30
+ configs_dir = os.path.dirname(__file__) # type: str
- zero_md5_hash = hashlib.md5()
- zero_md5_hash.update(b"\x00" * 1024)
- zero_md5 = zero_md5_hash.hexdigest()
-
- def __init__(self, config):
- PerfTest.__init__(self, config)
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
get = self.config.params.get
- do_get = self.config.params.__getitem__
- self.config_fname = do_get('cfg')
+ self.load_profile_name = self.config.params['load'] # type: str
+ self.name = "io." + self.load_profile_name
- if '/' not in self.config_fname and '.' not in self.config_fname:
- cfgs_dir = os.path.dirname(__file__)
- self.config_fname = os.path.join(cfgs_dir,
- self.config_fname + '.cfg')
-
- self.alive_check_interval = get('alive_check_interval')
- self.use_system_fio = get('use_system_fio', False)
-
- if get('prefill_files') is not None:
- logger.warning("prefill_files option is depricated. Use force_prefill instead")
-
- self.force_prefill = get('force_prefill', False)
- self.config_params = get('params', {}).copy()
-
- self.io_py_remote = self.join_remote("agent.py")
- self.results_file = self.join_remote("results.json")
- self.pid_file = self.join_remote("pid")
- self.task_file = self.join_remote("task.cfg")
- self.sh_file = self.join_remote("cmd.sh")
- self.err_out_file = self.join_remote("fio_err_out")
- self.io_log_file = self.join_remote("io_log.txt")
- self.exit_code_file = self.join_remote("exit_code")
-
- self.max_latency = get("max_lat", None)
- self.min_bw_per_thread = get("min_bw", None)
-
- self.use_sudo = get("use_sudo", True)
-
- self.raw_cfg = open(self.config_fname).read()
- self.fio_configs = None
-
- @classmethod
- def load(cls, suite_name: str, folder: str) -> IOTestResults:
- res = []
- for fname in os.listdir(folder):
- if re.match("\d+_params.yaml$", fname):
- num = int(fname.split('_')[0])
- res.append(load_test_results(folder, num))
- return IOTestResults(suite_name, res, folder)
-
- def cleanup(self):
- # delete_file(conn, self.io_py_remote)
- # Need to remove tempo files, used for testing
- pass
-
- # size is megabytes
- def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
- try:
- fstats = node.stat_file(fname)
- if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
- return True
- except EnvironmentError:
- return True
-
- cmd = 'python -c "' + \
- "import sys;" + \
- "fd = open('{0}', 'rb');" + \
- "fd.seek({1});" + \
- "data = fd.read(1024); " + \
- "sys.stdout.write(data + ' ' * ( 1024 - len(data)))\" | md5sum"
-
- if self.use_sudo:
- cmd = "sudo " + cmd
-
- bsize = size * (1024 ** 2)
- offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
- offsets.append(bsize - 1024)
- offsets.append(0)
-
- for offset in offsets:
- data = node.run(cmd.format(fname, offset), nolog=True)
-
- md = ""
- for line in data.split("\n"):
- if "unable to resolve" not in line:
- md = line.split()[0].strip()
- break
-
- if len(md) != 32:
- logger.error("File data check is failed - " + data)
- return True
-
- if self.zero_md5 == md:
- return True
-
- return False
-
- def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
- if self.use_system_fio:
- cmd_templ = "fio "
+ if os.path.isfile(self.load_profile_name):
+ self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg') # type: str
else:
- cmd_templ = "{0}/fio ".format(self.config.remote_dir)
+ self.load_profile_path = self.load_profile_name
- if self.use_sudo:
- cmd_templ = "sudo " + cmd_templ
+ self.load_profile = open(self.load_profile_path, 'rt').read() # type: str
- cmd_templ += "--name=xxx --filename={0} --direct=1" + \
- " --bs=4m --size={1}m --rw=write"
-
- ssize = 0
-
- if force:
- logger.info("File prefilling is forced")
-
- ddtime = 0
- for fname, curr_sz in files.items():
- if not force:
- if not self.check_prefill_required(node, fname, curr_sz):
- logger.debug("prefill is skipped")
- continue
-
- logger.info("Prefilling file {0}".format(fname))
- cmd = cmd_templ.format(fname, curr_sz)
- ssize += curr_sz
-
- stime = time.time()
- node.run(cmd, timeout=curr_sz)
- ddtime += time.time() - stime
-
- if ddtime > 1.0:
- fill_bw = int(ssize / ddtime)
- mess = "Initiall fio fill bw is {0} MiBps for this vm"
- logger.info(mess.format(fill_bw))
-
- def install_utils(self, node: INode) -> None:
- need_install = []
- packs = [('screen', 'screen')]
- os_info = get_os(node)
+ self.use_system_fio = get('use_system_fio', False) # type: bool
if self.use_system_fio:
- packs.append(('fio', 'fio'))
+ self.fio_path = "fio" # type: str
else:
- packs.append(('bzip2', 'bzip2'))
+ self.fio_path = os.path.join(self.config.remote_dir, "fio")
- for bin_name, package in packs:
- if bin_name is None:
- need_install.append(package)
- continue
+ self.force_prefill = get('force_prefill', False) # type: bool
- try:
- node.run('which ' + bin_name, nolog=True)
- except OSError:
- need_install.append(package)
-
- if len(need_install) != 0:
- if 'redhat' == os_info.distro:
- cmd = "sudo yum -y install " + " ".join(need_install)
- else:
- cmd = "sudo apt-get -y install " + " ".join(need_install)
-
- try:
- node.run(cmd)
- except OSError as err:
- raise OSError("Can't install - {}".format(" ".join(need_install))) from err
-
- if not self.use_system_fio:
- fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
- fio_dir = os.path.join(os.getcwd(), fio_dir)
- fio_dir = os.path.join(fio_dir, 'fio_binaries')
- fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
- fio_path = os.path.join(fio_dir, fname)
-
- if not os.path.exists(fio_path):
- raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
-
- bz_dest = self.join_remote('fio.bz2')
- node.copy_file(fio_path, bz_dest)
- node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
- node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
-
- def pre_run(self) -> None:
- if 'FILESIZE' not in self.config_params:
+ if 'FILESIZE' not in self.config.params:
raise NotImplementedError("File size detection is not implemented")
- self.fio_configs = fio_cfg_compile(self.raw_cfg,
- self.config_fname,
- self.config_params)
- self.fio_configs = list(self.fio_configs)
+ # self.max_latency = get("max_lat") # type: Optional[int]
+ # self.min_bw_per_thread = get("min_bw") # type: Optional[int]
- files = {}
+ self.use_sudo = get("use_sudo", True) # type: bool
+
+ self.fio_configs = list(fio_cfg_compile(self.load_profile,
+ self.load_profile_path,
+ cast(FioParams, self.config.params)))
+
+ if len(self.fio_configs) == 0:
+ logger.exception("Empty fio config provided")
+ raise StopTestError("Empty fio config provided")
+
+ self.iterations_configs = self.fio_configs # type: ignore
+ self.files_sizes = self.get_file_sizes()
+
+ self.exec_folder = self.config.remote_dir
+ self.fio_path = "" if self.use_system_fio else self.exec_folder
+
+ def get_file_sizes(self) -> Dict[str, int]:
+ files_sizes = {} # type: Dict[str, int]
+
for section in self.fio_configs:
sz = ssize2b(section.vals['size'])
- msz = sz / (1024 ** 2)
-
- if sz % (1024 ** 2) != 0:
- msz += 1
-
- fname = section.vals['filename']
+ msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
+ fname = section.vals['filename'] # type: str
# if already has other test with the same file name
# take largest size
- files[fname] = max(files.get(fname, 0), msz)
+ files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
- fc = functools.partial(self.pre_run_th,
- files=files,
- force=self.force_prefill)
- list(pool.map(fc, self.config.nodes))
+ return files_sizes
- def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
+ def config_node(self, node: IRPCNode) -> None:
try:
- cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(node.get_user(),
- self.config.remote_dir)
- node.run(cmd, nolog=True)
-
- assert self.config.remote_dir != "" and self.config.remote_dir != "/"
- node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
-
+ node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
+ node.conn.mkdir(self.config.remote_dir)
except Exception as exc:
- msg = "Failed to create folder {} on remote {}."
- msg = msg.format(self.config.remote_dir, node, exc)
+ msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
logger.exception(msg)
raise StopTestError(msg) from exc
self.install_utils(node)
- self.prefill_test_files(node, files, force_prefil)
+ logger.info("Prefilling test files with random data")
+ fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
+ if fill_bw is not None:
+ logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
- def show_expected_execution_time(self) -> None:
- if len(self.fio_configs) > 1:
- # +10% - is a rough estimation for additional operations
- # like sftp, etc
- exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
- exec_time_s = sec_to_str(exec_time)
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- msg = "Entire test should takes aroud: {0} and finished at {1}"
- logger.info(msg.format(exec_time_s,
- end_dt.strftime("%H:%M:%S")))
-
- def run(self) -> IOTestResults:
- logger.debug("Run preparation")
- self.pre_run()
- self.show_expected_execution_time()
- num_nodes = len(self.config.nodes)
-
- tname = os.path.basename(self.config_fname)
- if tname.endswith('.cfg'):
- tname = tname[:-4]
-
- barrier = Barrier(num_nodes)
- results = []
-
- # set of Operation_Mode_BlockSize str's
- # which should not be tested anymore, as
- # they already too slow with previous thread count
- lat_bw_limit_reached = set()
-
- with ThreadPoolExecutor(num_nodes) as pool:
- for pos, fio_cfg in enumerate(self.fio_configs):
- test_descr = get_test_summary(fio_cfg.vals, noqd=True)
- if test_descr in lat_bw_limit_reached:
- continue
-
- logger.info("Will run {} test".format(fio_cfg.name))
- templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
- exec_time = execution_time(fio_cfg)
- exec_time_str = sec_to_str(exec_time)
- timeout = int(exec_time + max(300, exec_time))
-
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
-
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
-
- run_test_func = functools.partial(self.do_run,
- barrier=barrier,
- fio_cfg=fio_cfg,
- pos=pos)
-
- max_retr = 3
- for idx in range(max_retr):
- if 0 != idx:
- logger.info("Sleeping %ss and retrying", self.retry_time)
- time.sleep(self.retry_time)
-
- try:
- intervals = list(pool.map(run_test_func, self.config.nodes))
- if None not in intervals:
- break
- except (EnvironmentError, SSHException) as exc:
- if max_retr - 1 == idx:
- raise StopTestError("Fio failed") from exc
- logger.exception("During fio run")
-
- fname = "{}_task.fio".format(pos)
- with open(os.path.join(self.config.log_directory, fname), "w") as fd:
- fd.write(str(fio_cfg))
-
- params = {'vm_count': num_nodes}
- params['name'] = fio_cfg.name
- params['vals'] = dict(fio_cfg.vals.items())
- params['intervals'] = intervals
- params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
-
- fname = "{}_params.yaml".format(pos)
- with open(os.path.join(self.config.log_directory, fname), "w") as fd:
- fd.write(dumps(params))
-
- res = load_test_results(self.config.log_directory, pos)
- results.append(res)
-
- if self.max_latency is not None:
- lat_50, _ = res.get_lat_perc_50_95_multy()
-
- # conver us to ms
- if self.max_latency < lat_50:
- logger.info(("Will skip all subsequent tests of {} " +
- "due to lat/bw limits").format(fio_cfg.name))
- lat_bw_limit_reached.add(test_descr)
-
- test_res = res.get_params_from_fio_report()
- if self.min_bw_per_thread is not None:
- if self.min_bw_per_thread > average(test_res['bw']):
- lat_bw_limit_reached.add(test_descr)
-
- return IOTestResults(self.config.params['cfg'],
- results, self.config.log_directory)
-
- def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
- exec_folder = self.config.remote_dir
-
+ def install_utils(self, node: IRPCNode) -> None:
if self.use_system_fio:
- fio_path = ""
- else:
- if not exec_folder.endswith("/"):
- fio_path = exec_folder + "/"
- else:
- fio_path = exec_folder
+ node.conn.install('fio', binary='fio')
- exec_time = execution_time(fio_cfg)
- barrier.wait()
- run_data = node.rpc.fio.run_fio(self.use_sudo,
- fio_path,
- exec_folder,
- str(fio_cfg),
+ if not self.use_system_fio:
+ os_info = get_os(node)
+ fio_dir = os.path.dirname(os.path.dirname(wally.__file__)) # type: str
+ fio_dir = os.path.join(os.getcwd(), fio_dir)
+ fio_dir = os.path.join(fio_dir, 'fio_binaries')
+ fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
+ fio_path = os.path.join(fio_dir, fname) # type: str
+
+ if not os.path.exists(fio_path):
+ raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
+
+ bz_dest = self.join_remote('fio.bz2') # type: str
+ node.copy_file(fio_path, bz_dest)
+ node.run("bzip2 --decompress {}" + bz_dest)
+ node.run("chmod a+x " + self.join_remote("fio"))
+
+ def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
+ return execution_time(cast(FioJobSection, iteration_info))
+
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ exec_time = execution_time(cast(FioJobSection, iter_config))
+ raw_res = node.conn.fio.run_fio(self.fio_path,
+ self.exec_folder,
+ str(cast(FioJobSection, iter_config)),
exec_time + max(300, exec_time))
- return parse_fio_result(run_data)
+ # TODO(koder): fix next error
+ raise NotImplementedError("Need to extract time from test result")
+ return raw_res, (0, 0)
- @classmethod
- def prepare_data(cls, results) -> List[Dict[str, Any]]:
- """create a table with io performance report for console"""
-
- def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
- tpl = data.summary_tpl()
- return (data.name,
- tpl.oper,
- tpl.mode,
- ssize2b(tpl.bsize),
- int(tpl.th_count) * int(tpl.vm_count))
- res = []
-
- for item in sorted(results, key=key_func):
- test_dinfo = item.disk_perf_info()
- testnodes_count = len(item.config.nodes)
-
- iops, _ = test_dinfo.iops.rounded_average_conf()
-
- if test_dinfo.iops_sys is not None:
- iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
- _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
- iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
- iops_sys = round_3_digit(iops_sys)
- else:
- iops_sys = None
- iops_sys_per_vm = None
- iops_sys_dev = None
- iops_sys_conf = None
-
- bw, bw_conf = test_dinfo.bw.rounded_average_conf()
- _, bw_dev = test_dinfo.bw.rounded_average_dev()
- conf_perc = int(round(bw_conf * 100 / bw))
- dev_perc = int(round(bw_dev * 100 / bw))
-
- lat_50 = round_3_digit(int(test_dinfo.lat_50))
- lat_95 = round_3_digit(int(test_dinfo.lat_95))
- lat_avg = round_3_digit(int(test_dinfo.lat_avg))
-
- iops_per_vm = round_3_digit(iops / testnodes_count)
- bw_per_vm = round_3_digit(bw / testnodes_count)
-
- iops = round_3_digit(iops)
- bw = round_3_digit(bw)
-
- summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
-
- res.append({"name": key_func(item)[0],
- "key": key_func(item)[:4],
- "summ": summ,
- "iops": int(iops),
- "bw": int(bw),
- "conf": str(conf_perc),
- "dev": str(dev_perc),
- "iops_per_vm": int(iops_per_vm),
- "bw_per_vm": int(bw_per_vm),
- "lat_50": lat_50,
- "lat_95": lat_95,
- "lat_avg": lat_avg,
-
- "iops_sys": iops_sys,
- "iops_sys_per_vm": iops_sys_per_vm,
- "sys_conf": iops_sys_conf,
- "sys_dev": iops_sys_dev})
-
- return res
-
- Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
- fiels_and_header = [
- Field("Name", "name", "l", 7),
- Field("Description", "summ", "l", 19),
- Field("IOPS\ncum", "iops", "r", 3),
- # Field("IOPS_sys\ncum", "iops_sys", "r", 3),
- Field("KiBps\ncum", "bw", "r", 6),
- Field("Cnf %\n95%", "conf", "r", 3),
- Field("Dev%", "dev", "r", 3),
- Field("iops\n/vm", "iops_per_vm", "r", 3),
- Field("KiBps\n/vm", "bw_per_vm", "r", 6),
- Field("lat ms\nmedian", "lat_50", "r", 3),
- Field("lat ms\n95%", "lat_95", "r", 3),
- Field("lat\navg", "lat_avg", "r", 3),
- ]
-
- fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
-
- @classmethod
- def format_for_console(cls, results) -> str:
- """create a table with io performance report for console"""
-
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align([f.allign for f in cls.fiels_and_header])
- sep = ["-" * f.size for f in cls.fiels_and_header]
- tab.header([f.header for f in cls.fiels_and_header])
- prev_k = None
- for item in cls.prepare_data(results):
- if prev_k is not None:
- if prev_k != item["key"]:
- tab.add_row(sep)
-
- prev_k = item["key"]
- tab.add_row([item[f.attr] for f in cls.fiels_and_header])
-
- return tab.draw()
-
- @classmethod
- def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
- """create a table with io performance report for console"""
-
- tab = texttable.Texttable(max_width=200)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-
- header = [
- cls.fiels_and_header_dct["name"].header,
- cls.fiels_and_header_dct["summ"].header,
- ]
- allign = ["l", "l"]
-
- header.append("IOPS ~ Cnf% ~ Dev%")
- allign.extend(["r"] * len(list_of_results))
- header.extend(
- "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
- )
-
- header.append("BW")
- allign.extend(["r"] * len(list_of_results))
- header.extend(
- "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
- )
-
- header.append("LAT")
- allign.extend(["r"] * len(list_of_results))
- header.extend(
- "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
- )
-
- tab.header(header)
- sep = ["-" * 3] * len(header)
- processed_results = map(cls.prepare_data, list_of_results)
-
- key2results = []
- for res in processed_results:
- key2results.append(dict(
- ((item["name"], item["summ"]), item) for item in res
- ))
-
- prev_k = None
- iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
- for item in processed_results[0]:
- if prev_k is not None:
- if prev_k != item["key"]:
- tab.add_row(sep)
-
- prev_k = item["key"]
-
- key = (item['name'], item['summ'])
- line = list(key)
- base = key2results[0][key]
-
- line.append(iops_frmt.format(base))
-
- for test_results in key2results[1:]:
- val = test_results.get(key)
- if val is None:
- line.append("-")
- elif base['iops'] == 0:
- line.append("Nan")
- else:
- prc_val = {'dev': val['dev'], 'conf': val['conf']}
- prc_val['iops'] = int(100 * val['iops'] / base['iops'])
- line.append(iops_frmt.format(prc_val))
-
- line.append(base['bw'])
-
- for test_results in key2results[1:]:
- val = test_results.get(key)
- if val is None:
- line.append("-")
- elif base['bw'] == 0:
- line.append("Nan")
- else:
- line.append(int(100 * val['bw'] / base['bw']))
-
- for test_results in key2results:
- val = test_results.get(key)
- if val is None:
- line.append("-")
- else:
- line.append("{0[lat_50]} - {0[lat_95]}".format(val))
-
- tab.add_row(line)
-
- tab.set_cols_align(allign)
- return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 1b6ba21..1bdbb15 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,10 +7,11 @@
import os.path
import argparse
import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple
-from collections import OrderedDict, namedtuple
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from collections import OrderedDict
+from ..itest import IterationConfig
from ...utils import sec_to_str, ssize2b
@@ -19,15 +20,27 @@
INCLUDE = 2
-Var = namedtuple('Var', ('name',))
-CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
- 'tp', 'name', 'val'))
+Var = NamedTuple('Var', [('name', str)])
+CfgLine = NamedTuple('CfgLine',
+ [('fname', str),
+ ('lineno', int),
+ ('oline', str),
+ ('tp', int),
+ ('name', str),
+ ('val', Any)])
+
+TestSumm = NamedTuple("TestSumm",
+ [("oper", str),
+ ("mode", str),
+ ("bsize", int),
+ ("iodepth", int),
+ ("vm_count", int)])
-class FioJobSection:
- def __init__(self, name: str):
+class FioJobSection(IterationConfig):
+ def __init__(self, name: str) -> None:
self.name = name
- self.vals = OrderedDict()
+ self.vals = OrderedDict() # type: Dict[str, Any]
def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
@@ -40,7 +53,7 @@
def is_free(self) -> bool:
return len(list(self.required_vars())) == 0
- def __str__(self):
+ def __str__(self) -> str:
res = "[{0}]\n".format(self.name)
for name, val in self.vals.items():
@@ -55,13 +68,13 @@
class ParseError(ValueError):
- def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
+ def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
ValueError.__init__(self, msg)
self.file_name = fname
self.lineno = lineno
self.line_cont = line_cont
- def __str__(self):
+ def __str__(self) -> str:
msg = "In {0}:{1} ({2}) : {3}"
return msg.format(self.file_name,
self.lineno,
@@ -70,10 +83,10 @@
def is_name(name: str) -> bool:
- return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
+ return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name) is not None
-def parse_value(val: str) -> Union[int, str, Dict, Var]:
+def parse_value(val: str) -> Union[int, str, float, List, Var]:
try:
return int(val)
except ValueError:
@@ -88,12 +101,13 @@
assert val.endswith("%}")
content = val[2:-2]
vals = list(i.strip() for i in content.split(','))
- return map(parse_value, vals)
+ return list(map(parse_value, vals))
if val.startswith('{'):
assert val.endswith("}")
assert is_name(val[1:-1])
return Var(val[1:-1])
+
return val
@@ -133,15 +147,15 @@
def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobSection]:
in_globals = False
curr_section = None
- glob_vals = OrderedDict()
+ glob_vals = OrderedDict() # type: Dict[str, Any]
sections_count = 0
- lexed_lines = list(lexer_iter)
+ lexed_lines = list(lexer_iter) # type: List[CfgLine]
one_more = True
includes = {}
while one_more:
- new_lines = []
+ new_lines = [] # type: List[CfgLine]
one_more = False
for line in lexed_lines:
fname, lineno, oline, tp, name, val = line
@@ -205,7 +219,7 @@
def process_cycles(sec: FioJobSection) -> Iterator[FioJobSection]:
- cycles = OrderedDict()
+ cycles = OrderedDict() # type: Dict[str, Any]
for name, val in sec.vals.items():
if isinstance(val, list) and name.upper() != name:
@@ -214,11 +228,11 @@
if len(cycles) == 0:
yield sec
else:
- # qd should changes faster
- numjobs = cycles.pop('qd', None)
- items = cycles.items()
+ # iodepth should changes faster
+ numjobs = cycles.pop('iodepth', None)
+ items = list(cycles.items())
- if len(items) > 0:
+ if items:
keys, vals = zip(*items)
keys = list(keys)
vals = list(vals)
@@ -228,7 +242,7 @@
if numjobs is not None:
vals.append(numjobs)
- keys.append('qd')
+ keys.append('iodepth')
for combination in itertools.product(*vals):
new_sec = sec.copy()
@@ -236,12 +250,12 @@
yield new_sec
-FIO_PARAM_VAL = Union[str, Var]
-FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+FioParamsVal = Union[str, Var]
+FioParams = Dict[str, FioParamsVal]
-def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
- processed_vals = OrderedDict()
+def apply_params(sec: FioJobSection, params: FioParams) -> FioJobSection:
+ processed_vals = OrderedDict() # type: Dict[str, Any]
processed_vals.update(params)
for name, val in sec.vals.items():
if name in params:
@@ -329,9 +343,6 @@
return 'a'
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-
-
def get_test_summary_tuple(sec: FioJobSection, vm_count: int = None) -> TestSumm:
if isinstance(sec, dict):
vals = sec
@@ -355,12 +366,12 @@
vm_count)
-def get_test_summary(sec: FioJobSection, vm_count: int = None, noqd: bool = False) -> str:
+def get_test_summary(sec: FioJobSection, vm_count: int = None, noiodepth: bool = False) -> str:
tpl = get_test_summary_tuple(sec, vm_count)
res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
- if not noqd:
- res += "qd{}".format(tpl.qd)
+ if not noiodepth:
+ res += "qd{}".format(tpl.iodepth)
if tpl.vm_count is not None:
res += "vm{}".format(tpl.vm_count)
@@ -387,7 +398,7 @@
yield res
-def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Iterator[FioJobSection]:
+def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index ca3f0f3..8e2e09f 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -1,7 +1,20 @@
+import os
+import time
+import stat
+import random
+import subprocess
+
+
def rpc_run_fio(cfg):
fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
"--output={out_file} --alloc-size=262144 {job_file}"
+ result = {
+ "name": [float],
+ "lat_name": [[float]]
+ }
+
+ return result
# fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
#
# timeout = int(exec_time + max(300, exec_time))
@@ -11,5 +24,66 @@
# fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
#
-def parse_fio_result(data):
- pass
+def rpc_check_file_prefilled(path, used_size_mb):
+ used_size = used_size_mb * 1024 ** 2
+ blocks_to_check = 16
+
+ try:
+ fstats = os.stat(path)
+ if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
+ return True
+ except EnvironmentError:
+ return True
+
+ offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
+ offsets.append(used_size - 1024)
+ offsets.append(0)
+
+ with open(path, 'rb') as fd:
+ for offset in offsets:
+ fd.seek(offset)
+ if b"\x00" * 1024 == fd.read(1024):
+ return True
+
+ return False
+
+
+def rpc_prefill_test_files(files, force=False, fio_path='fio'):
+ cmd_templ = "{0} --name=xxx --filename={1} --direct=1" + \
+ " --bs=4m --size={2}m --rw=write"
+
+ ssize = 0
+ ddtime = 0.0
+
+ for fname, curr_sz in files.items():
+ if not force:
+ if not rpc_check_file_prefilled(fname, curr_sz):
+ continue
+
+ cmd = cmd_templ.format(fio_path, fname, curr_sz)
+ ssize += curr_sz
+
+ stime = time.time()
+ subprocess.check_call(cmd)
+ ddtime += time.time() - stime
+
+ if ddtime > 1.0:
+ return int(ssize / ddtime)
+
+ return None
+
+
+def load_fio_log_file(fname):
+ with open(fname) as fd:
+ it = [ln.split(',')[:2] for ln in fd]
+
+ return [(float(off) / 1000, # convert us to ms
+ float(val.strip()) + 0.5) # add 0.5 to compemsate average value
+ # as fio trimm all values in log to integer
+ for off, val in it]
+
+
+
+
+
+
diff --git a/wally/suits/io/rpc_plugin.pyi b/wally/suits/io/rpc_plugin.pyi
new file mode 100644
index 0000000..1155007
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.pyi
@@ -0,0 +1,8 @@
+from typing import Any, Optional, Dict, List
+
+def rpc_run_fio(cfg: Dict[str, str]) -> Any: ...
+def rpc_check_file_prefilled(path: str, used_size_mb: int) -> bool: ...
+def rpc_prefill_test_files(files: Dict[str, int], force: bool = False, fio_path: str = 'fio') -> Optional[int]: ...
+
+
+def load_fio_log_file(fname: str) -> List[float]: ...
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 7004b8e..6d1eeee 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,38 +2,43 @@
import time
import logging
import os.path
-import functools
-from typing import Dict, Any, List, Tuple
+import datetime
+from typing import Dict, Any, List, Optional, Tuple, cast
from concurrent.futures import ThreadPoolExecutor
-from ..utils import Barrier, StopTestError
-from ..statistic import data_property
-from ..inode import INode
+from ..utils import Barrier, StopTestError, sec_to_str
+from ..node_interfaces import IRPCNode
from ..storage import Storage
+from ..result_classes import RawTestResults
+
+import agent
logger = logging.getLogger("wally")
-class TestConfig:
+__doc__ = "Contains base classes for performance tests"
+
+
+class TestInputConfig:
"""
this class describe test input configuration
- test_type:str - test type name
- params:{str:Any} - parameters from yaml file for this test
- test_uuid:str - UUID to be used to create filenames and Co
- log_directory:str - local directory to store results
- nodes:[Node] - node to run tests on
- remote_dir:str - directory on nodes to be used for local files
+ test_type - test type name
+ params - parameters from yaml file for this test
+ test_uuid - UUID to be used to create file names & Co
+ log_directory - local directory to store results
+ nodes - nodes to run tests on
+ remote_dir - directory on nodes to be used for local files
"""
def __init__(self,
test_type: str,
params: Dict[str, Any],
run_uuid: str,
- nodes: List[INode],
+ nodes: List[IRPCNode],
storage: Storage,
- remote_dir: str):
+ remote_dir: str) -> None:
self.test_type = test_type
self.params = params
self.run_uuid = run_uuid
@@ -42,150 +47,21 @@
self.remote_dir = remote_dir
-class TestResults:
- """
- this class describe test results
-
- config:TestConfig - test config object
- params:dict - parameters from yaml file for this test
- results:{str:MeasurementMesh} - test results object
- raw_result:Any - opaque object to store raw results
- run_interval:(float, float) - test tun time, used for sensors
- """
- def __init__(self,
- config: TestConfig,
- results: Dict[str, Any],
- raw_result: Any,
- run_interval: Tuple[float, float]):
- self.config = config
- self.params = config.params
- self.results = results
- self.raw_result = raw_result
- self.run_interval = run_interval
-
- def __str__(self):
- res = "{0}({1}):\n results:\n".format(
- self.__class__.__name__,
- self.summary())
-
- for name, val in self.results.items():
- res += " {0}={1}\n".format(name, val)
-
- res += " params:\n"
-
- for name, val in self.params.items():
- res += " {0}={1}\n".format(name, val)
-
- return res
-
- @abc.abstractmethod
- def summary(self):
- pass
-
- @abc.abstractmethod
- def get_yamable(self):
- pass
-
-
-# class MeasurementMatrix:
-# """
-# data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-# """
-# def __init__(self, data, connections_ids):
-# self.data = data
-# self.connections_ids = connections_ids
-#
-# def per_vm(self):
-# return self.data
-#
-# def per_th(self):
-# return sum(self.data, [])
-
-
-class MeasurementResults:
- def stat(self):
- return data_property(self.data)
-
- def __str__(self):
- return 'TS([' + ", ".join(map(str, self.data)) + '])'
-
-
-class SimpleVals(MeasurementResults):
- """
- data:[float] - list of values
- """
- def __init__(self, data):
- self.data = data
-
-
-class TimeSeriesValue(MeasurementResults):
- """
- data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
- odata: original values
- """
- def __init__(self, data: List[Tuple[float, float, float]]):
- assert len(data) > 0
- self.odata = data[:]
- self.data = []
-
- cstart = 0
- for nstart, nval in data:
- self.data.append((cstart, nstart - cstart, nval))
- cstart = nstart
-
- @property
- def values(self) -> List[float]:
- return [val[2] for val in self.data]
-
- def average_interval(self) -> float:
- return float(sum([val[1] for val in self.data])) / len(self.data)
-
- def skip(self, seconds) -> 'TimeSeriesValue':
- nres = []
- for start, ln, val in self.data:
- nstart = start + ln - seconds
- if nstart > 0:
- nres.append([nstart, val])
- return self.__class__(nres)
-
- def derived(self, tdelta) -> 'TimeSeriesValue':
- end = self.data[-1][0] + self.data[-1][1]
- tdelta = float(tdelta)
-
- ln = end / tdelta
-
- if ln - int(ln) > 0:
- ln += 1
-
- res = [[tdelta * i, 0.0] for i in range(int(ln))]
-
- for start, lenght, val in self.data:
- start_idx = int(start / tdelta)
- end_idx = int((start + lenght) / tdelta)
-
- for idx in range(start_idx, end_idx + 1):
- rstart = tdelta * idx
- rend = tdelta * (idx + 1)
-
- intersection_ln = min(rend, start + lenght) - max(start, rstart)
- if intersection_ln > 0:
- try:
- res[idx][1] += val * intersection_ln / tdelta
- except IndexError:
- raise
-
- return self.__class__(res)
+class IterationConfig:
+ name = None # type: str
class PerfTest:
- """
- Very base class for tests
- config:TestConfig - test configuration
- stop_requested:bool - stop for test requested
- """
- def __init__(self, config):
+ """Base class for all tests"""
+ name = None # type: str
+ max_retry = 3
+ retry_time = 30
+
+ def __init__(self, config: TestInputConfig) -> None:
self.config = config
self.stop_requested = False
+ self.nodes = self.config.nodes # type: List[IRPCNode]
+ self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
def request_stop(self) -> None:
self.stop_requested = True
@@ -193,13 +69,8 @@
def join_remote(self, path: str) -> str:
return os.path.join(self.config.remote_dir, path)
- @classmethod
@abc.abstractmethod
- def load(cls, path: str):
- pass
-
- @abc.abstractmethod
- def run(self):
+ def run(self, storage: Storage) -> None:
pass
@abc.abstractmethod
@@ -207,69 +78,182 @@
pass
-class ThreadedTest(PerfTest):
- """
- Base class for tests, which spawn separated thread for each node
- """
+RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
- def run(self) -> List[TestResults]:
- barrier = Barrier(len(self.config.nodes))
- th_test_func = functools.partial(self.th_test_func, barrier)
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
- return list(pool.map(th_test_func, self.config.nodes))
+class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
+ """Base class for tests, which spawn separated thread for each node"""
+
+ # max allowed time difference between starts and stops of run of the same test on different test nodes
+ # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
+ max_time_diff = 5
+ max_rel_time_diff = 0.05
+
+ def __init__(self, config: TestInputConfig) -> None:
+ PerfTest.__init__(self, config)
+ self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
@abc.abstractmethod
- def do_test(self, node: INode) -> TestResults:
+ def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
pass
- def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
- test_name = self.__class__.__name__
- logger.debug("Starting {} test on {}".format(test_name , node))
- logger.debug("Run test preparation on {}".format(node))
- self.pre_run(node)
+ def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
+ start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+ not_in_storage = {} # type: Dict[int, IterationConfig]
+ for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+ info_path = "result/{}/info".format(run_id)
+ if info_path in storage:
+ info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
- # wait till all thread became ready
- barrier.wait()
+ assert isinstance(info, dict), \
+ "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
- logger.debug("Run test on {}".format(node))
- try:
- return self.do_test(node)
- except Exception as exc:
- msg = "In test {} for {}".format(test_name, node)
- logger.exception(msg)
- raise StopTestError(msg) from exc
+ info = info.copy()
+ del info['begin_time']
+ del info['end_time']
- def pre_run(self, node: INode) -> None:
+ iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+ expected_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config,
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids
+ }
+
+ assert info == expected_config, \
+ "Test info at path {} is not equal to expected config." + \
+ "Maybe configuration was changed before test was restarted. " + \
+ "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+
+ logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
+ else:
+ not_in_storage[run_id] = iteration_config
+ return not_in_storage
+
+ def run(self, storage: Storage) -> None:
+ not_in_storage = self.get_not_done_stages(storage)
+
+ if not not_in_storage:
+ logger.info("All test iteration in storage already. Skip test")
+ return
+
+ logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
+
+ barrier = Barrier(len(self.nodes))
+
+ logger.debug("Run preparation")
+
+ with ThreadPoolExecutor(len(self.nodes)) as pool:
+ list(pool.map(self.config_node, self.nodes))
+
+ # +5% - is a rough estimation for additional operations
+ run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
+ if None not in run_times:
+ expected_run_time = int(sum(run_times) * 1.05)
+ exec_time_s = sec_to_str(expected_run_time)
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, expected_run_time)
+ logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
+ .format(exec_time_s, end_dt))
+
+ for run_id, iteration_config in sorted(not_in_storage.items()):
+ iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+ logger.info("Run test iteration {} ".format(iter_name))
+
+ results = [] # type: List[RunTestRes]
+ for idx in range(self.max_retry):
+ barrier.wait()
+ try:
+ futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
+ results = [fut.result() for fut in futures]
+ except (EnvironmentError, agent.RPCError) as exc:
+ if self.max_retry - 1 == idx:
+ raise StopTestError("Fio failed") from exc
+ logger.exception("During fio run")
+ else:
+ if all(results):
+ break
+
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
+
+ start_times = [] # type: List[int]
+ stop_times = [] # type: List[int]
+
+ for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
+ for metrics_name, data in result.items():
+ path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
+ storage[path] = data # type: ignore
+ start_times.append(t_start)
+ stop_times.append(t_stop)
+
+ min_start_time = min(start_times)
+ max_start_time = max(start_times)
+ min_stop_time = min(stop_times)
+ max_stop_time = max(stop_times)
+
+ max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
+ max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
+
+ if min_start_time + self.max_time_diff < max_allowed_time_diff:
+ logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
+ .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+ if min_stop_time + self.max_time_diff < max_allowed_time_diff:
+ logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
+ .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+ test_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config,
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids,
+ 'begin_time': min_start_time,
+ 'end_time': max_stop_time
+ }
+
+ storage["result/{}/info".format(run_id)] = test_config # type: ignore
+
+ @abc.abstractmethod
+ def config_node(self, node: IRPCNode) -> None:
+ pass
+
+ @abc.abstractmethod
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
pass
-class TwoScriptTest(ThreadedTest):
- def __init__(self, *dt, **mp):
+class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
+ def __init__(self, *dt, **mp) -> None:
ThreadedTest.__init__(self, *dt, **mp)
- self.remote_dir = '/tmp'
self.prerun_script = self.config.params['prerun_script']
self.run_script = self.config.params['run_script']
-
self.prerun_tout = self.config.params.get('prerun_tout', 3600)
self.run_tout = self.config.params.get('run_tout', 3600)
+ self.iterations_configs = [None]
- def get_remote_for_script(self, script: str) -> str:
- return os.path.join(self.remote_dir, os.path.basename(script))
+ def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
+ return None
- def pre_run(self, node: INode) -> None:
- copy_paths(node.connection,
- {self.run_script: self.get_remote_for_script(self.run_script),
- self.prerun_script: self.get_remote_for_script(self.prerun_script)})
+ def config_node(self, node: IRPCNode) -> None:
+ node.copy_file(self.run_script, self.join_remote(self.run_script))
+ node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
- cmd = self.get_remote_for_script(self.prerun_script)
+ cmd = self.join_remote(self.prerun_script)
cmd += ' ' + self.config.params.get('prerun_opts', '')
node.run(cmd, timeout=self.prerun_tout)
- def do_test(self, node: INode) -> TestResults:
- cmd = self.get_remote_for_script(self.run_script)
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
t1 = time.time()
- res = node.run(cmd, timeout=self.run_tout)
+ res = self.parse_results(node.run(cmd, timeout=self.run_tout))
t2 = time.time()
- return TestResults(self.config, None, res, (t1, t2))
+ return res, (int(t1), int(t2))
+
+ @abc.abstractmethod
+ def parse_results(self, data: str) -> RawTestResults:
+ pass
+
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index ecdfa4f..db41a46 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,4 +1,4 @@
-from typing import List, Callable, Any, Dict, Optional
+from typing import List, Callable, Any, Dict, Optional, Set
from concurrent.futures import ThreadPoolExecutor
@@ -7,11 +7,12 @@
from .start_vms import OSCreds, OSConnection
from .storage import Storage
from .config import Config
+from .fuel_rest_api import Connection
class TestRun:
"""Test run information"""
- def __init__(self, config: Config, storage: Storage):
+ def __init__(self, config: Config, storage: Storage) -> None:
# NodesInfo list
self.nodes_info = [] # type: List[NodeInfo]
@@ -20,16 +21,18 @@
self.build_meta = {} # type: Dict[str,Any]
self.clear_calls_stack = [] # type: List[Callable[['TestRun'], None]]
- self.sensors_mon_q = None
# openstack credentials
self.fuel_openstack_creds = None # type: Optional[OSCreds]
self.os_creds = None # type: Optional[OSCreds]
self.os_connection = None # type: Optional[OSConnection]
+ self.fuel_conn = None # type: Optional[Connection]
+ self.rpc_code = None # type: bytes
self.storage = storage
self.config = config
self.sensors_data = SensorDatastore()
+ self.sensors_run_on = set() # type: Set[str]
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))