remove fuel support, many bugfixes, add sudo support for some cmd, add default ssh user
diff --git a/Docs/v2_plans.md b/Docs/v2_plans.md
index 3a49f6d..b53e881 100644
--- a/Docs/v2_plans.md
+++ b/Docs/v2_plans.md
@@ -1,5 +1,12 @@
TODO today:
-----------
+
+* Для сефа выводить нагрузку на пулы и PG
+
+
+
+
+
* QDIOTimeHeatmap поломано получаение девайса по роли (aka ceph-storage)
* ceph-osd ноды должны верифицироваться как storage также (в фильтрации девайсов)
* дефолтный ключ в конфиг для всех новых нод
diff --git a/README.md b/README.md
index 2166323..f4cbc00 100644
--- a/README.md
+++ b/README.md
@@ -3,3 +3,11 @@
Look into config-example for examples of config file.
Copy example in same folder and replace ${VAR} with appropriate value
+
+
+
+| | Size, TiB | $/TiB | IOPS WR | IOPS RD | BW WR MB| BW RD MB| Lat ms |
+|:--------:|----------:|:-----:|:-------:|:-------:|:-------:|:-------:|:------:|
+| SATA HDD | 10 | 25-50 | 50-150 | 50-150 | 100-200 | 100-200 | 3-7 |
+| SSD | 2 | 200 | 100-5k | 1k-20k | 50-400 | 200-500 | 0.1-1 |
+| NVME | 2 | 400 | 400-20k | 2k-50k | 200-1.5k| 500-2k | 0.01-1 |
\ No newline at end of file
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index cc78e31..5d0f25c 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -1,10 +1,4 @@
# ------------------------------------ CONFIGS -------------------------------------------------------------------
-#fuel:
-# url: http://172.16.44.13:8000/
-# creds: admin:admin@admin
-# ssh_creds: root:r00tme
-# openstack_env: test
-#
#openstack:
# skip_preparation: false
# openrc: /home/koder/workspace/scale_openrc
@@ -84,14 +78,14 @@
system-cpu: "*"
block-io: "*"
net-io: "*"
-# ceph:
-# sources:
-# - historic
-# osds: all
+ ceph:
+ sources: [historic]
+ osds: all
compute:
system-cpu: "*"
block-io: "sd*"
net-io: "*"
+ cluster: ceph-pools-io, ceph-pgs-io
#---------------------------------- TEST PROFILES --------------------------------------------------------------------
profiles:
diff --git a/fio_binaries/fio_artful_x86_64.bz2 b/fio_binaries/fio_artful_x86_64.bz2
new file mode 100644
index 0000000..07222f6
--- /dev/null
+++ b/fio_binaries/fio_artful_x86_64.bz2
Binary files differ
diff --git a/fio_binaries/fio_bionic_x86_64.bz2 b/fio_binaries/fio_bionic_x86_64.bz2
index b2ae653..c7f30c0 100644
--- a/fio_binaries/fio_bionic_x86_64.bz2
+++ b/fio_binaries/fio_bionic_x86_64.bz2
Binary files differ
diff --git a/fio_binaries/fio_trusty_x86_64.bz2 b/fio_binaries/fio_trusty_x86_64.bz2
index a90b483..83466af 100644
--- a/fio_binaries/fio_trusty_x86_64.bz2
+++ b/fio_binaries/fio_trusty_x86_64.bz2
Binary files differ
diff --git a/fio_binaries/fio_xenial_x86_64.bz2 b/fio_binaries/fio_xenial_x86_64.bz2
index 6f0f947..a132307 100644
--- a/fio_binaries/fio_xenial_x86_64.bz2
+++ b/fio_binaries/fio_xenial_x86_64.bz2
Binary files differ
diff --git a/pylint.rc b/pylint.rc
index 16028d5..d36e8fe 100644
--- a/pylint.rc
+++ b/pylint.rc
@@ -65,7 +65,7 @@
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
-disable=input-builtin,buffer-builtin,map-builtin-not-iterating,no-absolute-import,coerce-builtin,old-raise-syntax,delslice-method,useless-suppression,old-ne-operator,long-builtin,old-division,unicode-builtin,raw_input-builtin,unichr-builtin,oct-method,execfile-builtin,standarderror-builtin,long-suffix,reload-builtin,coerce-method,backtick,old-octal-literal,next-method-called,xrange-builtin,getslice-method,reduce-builtin,dict-iter-method,zip-builtin-not-iterating,suppressed-message,cmp-method,setslice-method,parameter-unpacking,file-builtin,filter-builtin-not-iterating,apply-builtin,dict-view-method,range-builtin-not-iterating,print-statement,metaclass-assignment,nonzero-method,intern-builtin,basestring-builtin,round-builtin,import-star-module-level,raising-string,indexing-exception,unpacking-in-except,cmp-builtin,hex-method,using-cmp-argument
+disable=input-builtin,buffer-builtin,map-builtin-not-iterating,no-absolute-import,coerce-builtin,old-raise-syntax,delslice-method,useless-suppression,old-ne-operator,long-builtin,old-division,unicode-builtin,raw_input-builtin,unichr-builtin,oct-method,execfile-builtin,standarderror-builtin,long-suffix,reload-builtin,coerce-method,backtick,old-octal-literal,next-method-called,xrange-builtin,getslice-method,reduce-builtin,dict-iter-method,zip-builtin-not-iterating,suppressed-message,cmp-method,setslice-method,parameter-unpacking,file-builtin,filter-builtin-not-iterating,apply-builtin,dict-view-method,range-builtin-not-iterating,print-statement,metaclass-assignment,nonzero-method,intern-builtin,basestring-builtin,round-builtin,import-star-module-level,raising-string,indexing-exception,unpacking-in-except,cmp-builtin,hex-method,using-cmp-argument,missing-docstring
[REPORTS]
@@ -193,7 +193,7 @@
[FORMAT]
# Maximum number of characters on a single line.
-max-line-length=100
+max-line-length=120
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
diff --git a/requirements.txt b/requirements.txt
index d6adebf..46385fd 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,4 +24,5 @@
scipy
seaborn
simplejson
-statsmodels
\ No newline at end of file
+statsmodels
+texttable
\ No newline at end of file
diff --git a/requirements_dev.txt b/requirements_dev.txt
new file mode 100644
index 0000000..3a68abe
--- /dev/null
+++ b/requirements_dev.txt
@@ -0,0 +1,2 @@
+mypy
+pylint
diff --git a/scripts/build_fio_ubuntu.sh b/scripts/build_fio_ubuntu.sh
index 1ce35d5..c534cfa 100644
--- a/scripts/build_fio_ubuntu.sh
+++ b/scripts/build_fio_ubuntu.sh
@@ -2,14 +2,13 @@
set -xe
apt update
-apt -y install g++ git zlib1g-dev libaio-dev librbd-dev make bzip2
+apt -y install g++ git zlib1g-dev libaio-dev make bzip2
cd /tmp
git clone https://github.com/axboe/fio.git
cd fio
./configure
make -j 4
. /etc/lsb-release
-# VERSION=$(cat /etc/lsb-release | grep DISTRIB_CODENAME | awk -F= '{print $2}')
chmod a-x fio
bzip2 -z -9 fio
mv fio.bz2 "fio_${DISTRIB_CODENAME}_x86_64.bz2"
diff --git a/scripts/connector.py b/scripts/connector.py
deleted file mode 100644
index 6f0f744..0000000
--- a/scripts/connector.py
+++ /dev/null
@@ -1,143 +0,0 @@
-import os
-import sys
-import logging
-import argparse
-import tempfile
-import paramiko
-
-import fuel_rest_api
-from nodes.node import Node
-from utils import parse_creds
-from urlparse import urlparse
-
-
-tmp_file = tempfile.NamedTemporaryFile().name
-openrc_path = tempfile.NamedTemporaryFile().name
-logger = logging.getLogger("io-perf-tool")
-
-
-def discover_fuel_nodes(fuel_url, creds, cluster_name):
- username, tenant_name, password = parse_creds(creds)
- creds = {"username": username,
- "tenant_name": tenant_name,
- "password": password}
-
- conn = fuel_rest_api.KeystoneAuth(fuel_url, creds, headers=None)
- cluster_id = fuel_rest_api.get_cluster_id(conn, cluster_name)
- cluster = fuel_rest_api.reflect_cluster(conn, cluster_id)
-
- nodes = list(cluster.get_nodes())
- ips = [node.get_ip('admin') for node in nodes]
- roles = [node["roles"] for node in nodes]
-
- host = urlparse(fuel_url).hostname
-
- nodes, to_clean = run_agent(ips, roles, host, tmp_file)
- nodes = [Node(node[0], node[1]) for node in nodes]
-
- openrc_dict = cluster.get_openrc()
-
- logger.debug("Found %s fuel nodes for env %r" % (len(nodes), cluster_name))
- return nodes, to_clean, openrc_dict
-
-
-def discover_fuel_nodes_clean(fuel_url, ssh_creds, nodes, base_port=12345):
- admin_ip = urlparse(fuel_url).hostname
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(hostname=admin_ip, port=ssh_creds["port"],
- password=ssh_creds["password"], username=ssh_creds["username"])
-
- command = "python /tmp/agent.py --clean=True --ext_ip=" + \
- admin_ip + " --base_port=" \
- + str(base_port) + " --ports"
-
- for node in nodes:
- ip = urlparse(node[0]).hostname
- command += " " + ip
-
- (stdin, stdout, stderr) = ssh.exec_command(command)
- for line in stdout.readlines():
- print line
-
-
-def run_agent(ip_addresses, roles, host, tmp_name, password="test37", port=22,
- base_port=12345):
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.connect(hostname=host, port=port, password=password, username="root")
- sftp = ssh.open_sftp()
- sftp.put(os.path.join(os.path.dirname(__file__), 'agent.py'),
- "/tmp/agent.py")
- fuel_id_rsa_path = tmp_name
- sftp.get('/root/.ssh/id_rsa', fuel_id_rsa_path)
- os.chmod(fuel_id_rsa_path, 0o700)
- command = "python /tmp/agent.py --base_port=" + \
- str(base_port) + " --ext_ip=" \
- + host + " --ports"
-
- for address in ip_addresses:
- command += " " + address
-
- (stdin, stdout, stderr) = ssh.exec_command(command)
- node_port_mapping = {}
-
- for line in stdout.readlines():
- results = line.split(' ')
-
- if len(results) != 2:
- continue
-
- node, port = results
- node_port_mapping[node] = port
-
- nodes = []
- nodes_to_clean = []
-
- for i in range(len(ip_addresses)):
- ip = ip_addresses[i]
- role = roles[i]
- port = node_port_mapping[ip]
-
- nodes_to_clean.append(("ssh://root@" + ip + ":" +
- port.rstrip('\n')
- + ":" + fuel_id_rsa_path, role))
-
- nodes.append(("ssh://root@" + host + ":" + port.rstrip('\n')
- + ":" + fuel_id_rsa_path, role))
-
- ssh.close()
- logger.info('Files has been transferred successfully to Fuel node, ' +
- 'agent has been launched')
-
- return nodes, nodes_to_clean
-
-
-def parse_command_line(argv):
- parser = argparse.ArgumentParser(
- description="Connect to fuel master and setup ssh agent")
- parser.add_argument(
- "--fuel_url", required=True)
- parser.add_argument(
- "--cluster_name", required=True)
- parser.add_argument(
- "--iface", default="eth1")
- parser.add_argument(
- "--creds", default="admin:admin@admin")
-
- return parser.parse_args(argv)
-
-
-def main(argv):
- args = parse_command_line(argv)
-
- nodes, to_clean, _ = discover_fuel_nodes(args.fuel_url,
- args.creds,
- args.cluster_name)
- discover_fuel_nodes_clean(args.fuel_url, {"username": "root",
- "password": "test37",
- "port": 22}, to_clean)
-
-
-if __name__ == "__main__":
- main(sys.argv[1:])
diff --git a/scripts/storage/__init__.py b/scripts/storage/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/scripts/storage/__init__.py
+++ /dev/null
diff --git a/scripts/storage/data_processing.py b/scripts/storage/data_processing.py
deleted file mode 100644
index b15c579..0000000
--- a/scripts/storage/data_processing.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# class displays measurement. Moved from storage_api_v_1
-# to avoid circular imports.
-import math
-
-# fix and update all this. Take statistic code from scripts/data2.py
-
-
-class Measurement(object):
- def __init__(self):
- self.build = ""
- self.build_type = 0 # GA/Master/Other
- self.md5 = ""
- self.name = ""
- self.date = None
- self.results = {
- "": (float, float)
- }
-
- def __str__(self):
- return self.build + " " + self.build_type + " " + \
- self.md5 + " " + str(self.results)
-
-
-def mean(l):
- return sum(l) / len(l)
-
-
-def stdev(l):
- m = mean(l)
- return math.sqrt(sum(map(lambda x: (x - m) ** 2, l)))
-
-
-def process_build_data(build):
- """ Function computes mean of all the data from particular build"""
- for item in build.items():
- if type(item[1]) is list:
- m = mean(item[1])
- s = stdev(item[1])
- build[item[0]] = [m, s]
-
-
-def create_measurement(data):
- """ Function creates measurement from data was extracted from database."""
-
- build_data = data[0]
-
- m = Measurement()
- m.build = build_data.build_id
- m.build_type = build_data.type
- m.name = build_data.name
- m.results = {}
-
- for i in range(1, len(data), 2):
- result = data[i]
- param_combination = data[i + 1]
-
- if not str(param_combination) in m.results:
- m.results[str(param_combination)] = [result.bandwith]
- else:
- m.results[str(param_combination)] += [result.bandwith]
-
- for k in m.results.keys():
- m.results[k] = [mean(m.results[k]), stdev(m.results[k])]
-
- return m
diff --git a/scripts/storage/db_manage.py b/scripts/storage/db_manage.py
deleted file mode 100644
index b9435d6..0000000
--- a/scripts/storage/db_manage.py
+++ /dev/null
@@ -1,100 +0,0 @@
-import argparse
-import imp
-import os.path
-import shutil
-import sqlite3
-import sys
-
-from os import remove
-from web_app.app import db
-from config import DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, basedir
-from migrate.versioning import api
-
-
-ACTIONS = {}
-
-
-def action(act):
- def wrap(f):
- ACTIONS[act] = f
-
- def inner(*args, **kwargs):
- return f(*args, **kwargs)
- return inner
- return wrap
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Manage DB")
- parser.add_argument("action",
- choices=["dropdb", "createdb", "migrate", "downgrade"])
- return parser.parse_args(argv)
-
-
-@action("createdb")
-def createdb():
- sqlite3.connect(os.path.join(basedir, 'app.db'))
-
- db.create_all()
- if not os.path.exists(SQLALCHEMY_MIGRATE_REPO):
- api.create(SQLALCHEMY_MIGRATE_REPO, 'database repository')
- api.version_control(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- else:
- api.version_control(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO,
- api.version(SQLALCHEMY_MIGRATE_REPO))
-
-
-@action("dropdb")
-def dropdb():
- db.create_all()
- if os.path.exists(SQLALCHEMY_MIGRATE_REPO):
- shutil.rmtree(SQLALCHEMY_MIGRATE_REPO)
-
- db.drop_all()
- if os.path.exists(os.path.join(basedir, 'app.db')):
- remove(os.path.join(basedir, 'app.db'))
-
-
-@action("migrate")
-def migrate():
- v = api.db_version(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- migration = SQLALCHEMY_MIGRATE_REPO + ('/versions/%03d_migration.py' %
- (v+1))
- tmp_module = imp.new_module('old_model')
- old_model = api.create_model(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
-
- exec old_model in tmp_module.__dict__
- script = api.make_update_script_for_model(DATABASE_URI,
- SQLALCHEMY_MIGRATE_REPO,
- tmp_module.meta, db.metadata)
- open(migration, "wt").write(script)
- api.upgrade(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- v = api.db_version(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- print('New migration saved as ' + migration)
- print('Current database version: ' + str(v))
-
-
-@action("upgrade")
-def upgrade():
- api.upgrade(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- v = api.db_version(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- print('Current database version: ' + str(v))
-
-
-@action("downgrade")
-def downgrade():
- v = api.db_version(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- api.downgrade(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO, v - 1)
- v = api.db_version(DATABASE_URI, SQLALCHEMY_MIGRATE_REPO)
- print('Current database version: ' + str(v))
-
-
-def main(argv):
- opts = parse_args(argv)
- func = ACTIONS.get(opts.action)
- func()
-
-
-if __name__ == '__main__':
- exit(main(sys.argv[1:]))
diff --git a/scripts/storage/models.py b/scripts/storage/models.py
deleted file mode 100644
index eb632e4..0000000
--- a/scripts/storage/models.py
+++ /dev/null
@@ -1,72 +0,0 @@
-from sqlalchemy import ForeignKey
-from web_app.app import db
-
-
-class Build(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- build_id = db.Column(db.String(64))
- name = db.Column(db.String(64))
- md5 = db.Column(db.String(64))
- type = db.Column(db.Integer)
-
- def __repr__(self):
- return self.build_id + " " + self.name + " " + self.type
-
-
-class Param(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(64))
- type = db.Column(db.String(64))
- descr = db.Column(db.String(4096))
-
-
-class ParamCombination(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- param_1 = db.Column(db.Text())
- param_2 = db.Column(db.Text())
- param_3 = db.Column(db.Text())
- param_4 = db.Column(db.Text())
- param_5 = db.Column(db.Text())
- param_6 = db.Column(db.Text())
- param_7 = db.Column(db.Text())
- param_8 = db.Column(db.Text())
- param_9 = db.Column(db.Text())
- param_10 = db.Column(db.Text())
- param_11 = db.Column(db.Text())
- param_12 = db.Column(db.Text())
- param_13 = db.Column(db.Text())
- param_14 = db.Column(db.Text())
- param_15 = db.Column(db.Text())
- param_16 = db.Column(db.Text())
- param_17 = db.Column(db.Text())
- param_18 = db.Column(db.Text())
- param_19 = db.Column(db.Text())
- param_20 = db.Column(db.Text())
-
- def __repr__(self):
- return self.param_1 + " " + self.param_2 + " " + self.param_3
-
-
-class Lab(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String(64), unique=True)
- url = db.Column(db.String(256))
- type = db.Column(db.String(4096))
- fuel_version = db.Column(db.String(64))
- ceph_version = db.Column(db.String(64))
- lab_general_info = db.Column(db.Text)
- lab_meta = db.Column(db.Text)
-
-
-class Result(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- build_id = db.Column(db.Integer, ForeignKey('build.id'))
- lab_id = db.Column(db.Integer, ForeignKey('lab.id'))
- date = db.Column(db.DateTime)
- param_combination_id = db.Column(db.Integer,
- ForeignKey('param_combination.id'))
- bandwith = db.Column(db.Float)
- meta = db.Column(db.String(4096))
-
- def __repr__(self):
- return str(self.bandwith) + " " + str(self.date)
diff --git a/scripts/storage/storage_api.py b/scripts/storage/storage_api.py
deleted file mode 100644
index c7b4e3e..0000000
--- a/scripts/storage/storage_api.py
+++ /dev/null
@@ -1,454 +0,0 @@
-import datetime
-from data_processing import Measurement, create_measurement, process_build_data
-from flask import json
-from meta_info import collect_lab_data, total_lab_info
-from sqlalchemy import sql
-from persistance.models import *
-
-
-def get_build_info(build_name):
- session = db.session()
- result = session.query(Result, Build).join(Build).\
- filter(Build.name == build_name).first()
- lab = session.query(Lab).filter(Lab.id == result[0].lab_id).first()
- return eval(lab.lab_general_info)
-
-
-def get_build_detailed_info(build_name):
- data = get_build_info(build_name)
- return total_lab_info(data)
-
-
-def add_io_params(session):
- """Filling Param table with initial parameters. """
-
- param1 = Param(name="operation", type='{"write", "randwrite", '
- '"read", "randread"}',
- descr="type of write operation")
- param2 = Param(name="sync", type='{"a", "s"}',
- descr="Write mode synchronous/asynchronous")
- param3 = Param(name="block size",
- type='{"1k", "2k", "4k", "8k", "16k", '
- '"32k", "64k", "128k", "256k"}')
-
- session.add(param1)
- session.add(param2)
- session.add(param3)
-
- session.commit()
-
-
-def add_build(session, build_id, build_name, build_type, md5):
- """Function which adds particular build to database."""
-
- build = Build(type=build_type, build_id=build_id,
- name=build_name, md5=md5)
- session.add(build)
- session.commit()
-
- return build.id
-
-
-def insert_results(session, build_id, lab_id, params_combination_id,
- time=None, bandwith=0.0, meta=""):
- """Function insert particular result. """
-
- result = Result(build_id=build_id, lab_id=lab_id,
- params_combination_id=params_combination_id, time=time,
- bandwith=bandwith, meta=meta)
- session.add(result)
- session.commit()
-
-
-def add_param_comb(session, *params):
- """function responsible for adding particular params
- combination to database"""
-
- params_names = sorted([s for s in dir(ParamCombination)
- if s.startswith('param_')])
- d = zip(params_names, params)
- where = ""
-
- for item in d:
- where = sql.and_(where, getattr(ParamCombination, item[0]) == item[1])
-
- query = session.query(ParamCombination).filter(where)
- rs = session.execute(query).fetchall()
-
- if len(rs) == 0:
- param_comb = ParamCombination()
-
- for p in params_names:
- i = int(p.split('_')[1])
-
- if i - 1 < len(params):
- param_comb.__setattr__('param_' + str(i), params[i - 1])
- param = session.query(Param).filter(Param.id == i).one()
- values = eval(param.type)
-
- if params[i - 1] not in values:
- values.add(params[i - 1])
- param.type = str(values)
-
- session.add(param_comb)
- session.commit()
- return param_comb.id
- else:
- return rs[0][0]
-
-
-def add_lab(session, lab_url, lab_name, ceph_version,
- fuel_version, data, info):
- """ Function add data about particular lab"""
- result = session.query(Lab).filter(Lab.name == lab_name).all()
-
- if len(result) != 0:
- return result[0].id
- else:
- lab = Lab(name=lab_name, url=lab_url, ceph_version=ceph_version,
- fuel_version=fuel_version, lab_general_info=str(data),
- lab_meta=str(info))
- session.add(lab)
- session.commit()
- return lab.id
-
-
-def add_data(data):
- """Function store list of builds in database"""
-
- data = json.loads(data)
- session = db.session()
- add_io_params(session)
-
- for build_data in data:
- build_id = add_build(session,
- build_data.pop("build_id"),
- build_data.pop("name"),
- build_data.pop("type"),
- build_data.pop("iso_md5"),
- )
-
- creds = {"username": build_data.pop("username"),
- "password": build_data.pop("password"),
- "tenant_name": build_data.pop("tenant_name")}
-
- lab_url = build_data.pop("lab_url")
- lab_name = build_data.pop("lab_name")
- ceph_version = build_data.pop("ceph_version")
- data = collect_lab_data(lab_url, creds)
- data['name'] = lab_name
- info = total_lab_info(data)
- lab_id = add_lab(session, lab_url=lab_name, lab_name=lab_url,
- ceph_version=ceph_version,
- fuel_version=data['fuel_version'],
- data=data, info=info)
-
- date = build_data.pop("date")
- date = datetime.datetime.strptime(date, "%a %b %d %H:%M:%S %Y")
-
- for params, [bw, dev] in build_data.items():
- param_comb_id = add_param_comb(session, *params.split(" "))
- result = Result(param_combination_id=param_comb_id,
- build_id=build_id, bandwith=bw,
- date=date, lab_id=lab_id)
- session.add(result)
- session.commit()
-
-
-def load_data(lab_id=None, build_id=None, *params):
- """ Function loads data by parameters described in *params tuple."""
-
- session = db.session()
- params_names = sorted([s for s in dir(ParamCombination)
- if s.startswith('param_')])
- d = zip(params_names, params)
- where = ""
-
- for item in d:
- where = sql.and_(where, getattr(ParamCombination, item[0]) == item[1])
-
- query = session.query(ParamCombination).filter(where)
- rs = session.execute(query).fetchall()
-
- ids = [r[0] for r in rs]
-
- rs = session.query(Result).\
- filter(Result.param_combination_id.in_(ids)).all()
-
- if lab_id is not None:
- rs = [r for r in rs if r.lab_id == lab_id]
-
- if build_id is not None:
- rs = [r for r in rs if r.build_id == build_id]
-
- return rs
-
-
-def load_all():
- """Load all builds from database"""
-
- session = db.session()
- results = session.query(Result, Build, ParamCombination).\
- join(Build).join(ParamCombination).all()
-
- return results
-
-
-def collect_builds_from_db(*names):
- """ Function collecting all builds from database and filter it by names"""
-
- results = load_all()
- d = {}
-
- for item in results:
- result_data = item[0]
- build_data = item[1]
- param_combination_data = item[2]
-
- if build_data.name not in d:
- d[build_data.name] = \
- [build_data, result_data, param_combination_data]
- else:
- d[build_data.name].append(result_data)
- d[build_data.name].append(param_combination_data)
-
- if len(names) == 0:
- return {k: v for k, v in d.items()}
-
- return {k: v for k, v in d.items() if k in names}
-
-
-def prepare_build_data(build_name):
- """
- #function preparing data for display plots.
- #Format {build_name : Measurement}
- """
-
- session = db.session()
- build = session.query(Build).filter(Build.name == build_name).first()
- names = []
-
- if build.type == 'GA':
- names = [build_name]
- else:
- res = session.query(Build).\
- filter(Build.type.in_(['GA', 'master', build.type])).all()
- for r in res:
- names.append(r.name)
-
- d = collect_builds_from_db()
- d = {k: v for k, v in d.items() if k in names}
- results = {}
-
- for data in d.keys():
- m = create_measurement(d[data])
- results[m.build_type] = m
-
- return results
-
-
-def builds_list():
- """
- Function getting list of all builds available to index page
- returns list of dicts which contains data to display on index page.
- """
-
- res = []
- builds = set()
- data = load_all()
-
- for item in data:
- build = item[1]
- result = item[0]
-
- if not build.name in builds:
- builds.add(build.name)
- d = {}
- d["type"] = build.type
- d["url"] = build.name
- d["date"] = result.date
- d["name"] = build.name
- res.append(d)
-
- return res
-
-
-def get_builds_data(names=None):
- """
- Processing data from database.
- List of dicts, where each dict contains build meta
- info and kev-value measurements.
- key - param combination.
- value - [mean, deviation]
- """
-
- d = collect_builds_from_db()
-
- if not names is None:
- d = {k: v for k, v in d.items() if k in names}
- else:
- d = {k: v for k, v in d.items()}
- output = []
-
- for key, value in d.items():
- result = {}
- build = value[0]
- result["build_id"] = build.build_id
- result["iso_md5"] = build.md5
- result["type"] = build.type
- result["date"] = "Date must be here"
-
- for i in range(1, len(value), 2):
- r = value[i]
- param_combination = value[i + 1]
-
- if not str(param_combination) in result:
- result[str(param_combination)] = [r.bandwith]
- else:
- result[str(param_combination)].append(r.bandwith)
-
- output.append(result)
-
- for build in output:
- process_build_data(build)
-
- return output
-
-
-def get_data_for_table(build_name=""):
- """ Function for getting result to display table """
-
- session = db.session()
- build = session.query(Build).filter(Build.name == build_name).one()
- names = []
-
- # Get names of build that we need.
- if build.type == 'GA':
- names = [build_name]
- else:
- res = session.query(Build).filter(
- Build.type.in_(['GA', 'master', build.type])).all()
- for r in res:
- names.append(r.name)
- # get data for particular builds.
- return get_builds_data(names)
-
-
-if __name__ == '__main__':
- # add_build("Some build", "GA", "bla bla")
- cred = {"username": "admin", "password": "admin", "tenant_name": "admin"}
- json_data = '[{\
- "username": "admin",\
- "password": "admin", \
- "tenant_name": "admin",\
- "lab_url": "http://172.16.52.112:8000",\
- "lab_name": "Perf-1-Env",\
- "ceph_version": "v0.80 Firefly",\
- "randwrite a 256k": [16885, 1869],\
- "randwrite s 4k": [79, 2],\
- "read a 64k": [74398, 11618],\
- "write s 1024k": [7490, 193],\
- "randwrite a 64k": [14167, 4665],\
- "build_id": "1",\
- "randread a 1024k": [68683, 8604],\
- "randwrite s 256k": [3277, 146],\
- "write a 1024k": [24069, 660],\
- "type": "GA",\
- "write a 64k": [24555, 1006],\
- "write s 64k": [1285, 57],\
- "write a 256k": [24928, 503],\
- "write s 256k": [4029, 192],\
- "randwrite a 1024k": [23980, 1897],\
- "randread a 64k": [27257, 17268],\
- "randwrite s 1024k": [8504, 238],\
- "randread a 256k": [60868, 2637],\
- "randread a 4k": [3612, 1355],\
- "read a 1024k": [71122, 9217],\
- "date": "Thu Feb 12 19:11:56 2015",\
- "write s 4k": [87, 3],\
- "read a 4k": [88367, 6471],\
- "read a 256k": [80904, 8930],\
- "name": "GA - 6.0 GA",\
- "randwrite s 1k": [20, 0],\
- "randwrite s 64k": [1029, 34],\
- "write s 1k": [21, 0],\
- "iso_md5": "bla bla"\
- },\
- {\
- "username": "admin",\
- "password": "admin", \
- "tenant_name": "admin",\
- "lab_url": "http://172.16.52.112:8000",\
- "ceph_version": "v0.80 Firefly",\
- "lab_name": "Perf-1-Env",\
- "randwrite a 256k": [20212, 5690],\
- "randwrite s 4k": [83, 6],\
- "read a 64k": [89394, 3912],\
- "write s 1024k": [8054, 280],\
- "randwrite a 64k": [14595, 3245],\
- "build_id": "2",\
- "randread a 1024k": [83277, 9310],\
- "randwrite s 256k": [3628, 433],\
- "write a 1024k": [29226, 8624],\
- "type": "master",\
- "write a 64k": [25089, 790],\
- "write s 64k": [1236, 30],\
- "write a 256k": [30327, 9799],\
- "write s 256k": [4049, 172],\
- "randwrite a 1024k": [29000, 9302],\
- "randread a 64k": [26775, 16319],\
- "randwrite s 1024k": [8665, 1457],\
- "randread a 256k": [63608, 16126],\
- "randread a 4k": [3212, 1620],\
- "read a 1024k": [89676, 4401],\
- "date": "Thu Feb 12 19:11:56 2015",\
- "write s 4k": [88, 3],\
- "read a 4k": [92263, 5186],\
- "read a 256k": [94505, 6868],\
- "name": "6.1 Dev",\
- "randwrite s 1k": [22, 3],\
- "randwrite s 64k": [1105, 46],\
- "write s 1k": [22, 0],\
- "iso_md5": "bla bla"\
- },\
- {\
- "username": "admin",\
- "password": "admin", \
- "tenant_name": "admin",\
- "lab_url": "http://172.16.52.112:8000",\
- "ceph_version": "v0.80 Firefly",\
- "lab_name": "Perf-1-Env",\
- "randwrite a 256k": [16885, 1869],\
- "randwrite s 4k": [79, 2],\
- "read a 64k": [74398, 11618],\
- "write s 1024k": [7490, 193],\
- "randwrite a 64k": [14167, 4665],\
- "build_id": "1",\
- "randread a 1024k": [68683, 8604],\
- "randwrite s 256k": [3277, 146],\
- "write a 1024k": [24069, 660],\
- "type": "sometype",\
- "write a 64k": [24555, 1006],\
- "write s 64k": [1285, 57],\
- "write a 256k": [24928, 503],\
- "write s 256k": [4029, 192],\
- "randwrite a 1024k": [23980, 1897],\
- "randread a 64k": [27257, 17268],\
- "randwrite s 1024k": [8504, 238],\
- "randread a 256k": [60868, 2637],\
- "randread a 4k": [3612, 1355],\
- "read a 1024k": [71122, 9217],\
- "date": "Thu Feb 12 19:11:56 2015",\
- "write s 4k": [87, 3],\
- "read a 4k": [88367, 6471],\
- "read a 256k": [80904, 8930],\
- "name": "somedev",\
- "randwrite s 1k": [20, 0],\
- "randwrite s 64k": [1029, 34],\
- "write s 1k": [21, 0],\
- "iso_md5": "bla bla"\
- }]'
-
- # json_to_db(json_data)
- print load_data(1, 2)
- # add_data(json_data)
diff --git a/wally/ceph.py b/wally/ceph.py
index 35e78b4..4afff06 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,7 +1,7 @@
""" Collect data about ceph nodes"""
+import enum
import logging
from typing import Dict, cast, List, Set
-
from cephlib import discover
from cephlib.discover import OSDInfo
from cephlib.common import to_ip
@@ -19,7 +19,6 @@
def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
"""Get set of osd's ip"""
- res: Dict[IP, List[OSDInfo]] = {}
return {IP(ip): osd_info_list
for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
@@ -45,7 +44,12 @@
ignore_errors = 'ignore_errors' in ctx.config.discover
ceph = ctx.config.ceph
- root_node_uri = cast(str, ceph.root_node)
+ try:
+ root_node_uri = cast(str, ceph.root_node)
+ except AttributeError:
+ logger.error("'root_node' option must be provided in 'ceph' config section. " +
+ "It must be the name of the node, which has access to ceph")
+ raise StopTestError()
cluster = ceph.get("cluster", "ceph")
ip_remap = ctx.config.ceph.get('ip_remap', {})
@@ -58,13 +62,7 @@
if key is None:
key = f"/etc/ceph/{cluster}.client.admin.keyring"
- ceph_extra_args = ""
-
- if conf:
- ceph_extra_args += f" -c '{conf}'"
-
- if key:
- ceph_extra_args += f" -k '{key}'"
+ ctx.ceph_extra_args = f" -c '{conf}' -k '{key}'"
logger.debug(f"Start discovering ceph nodes from root {root_node_uri}")
logger.debug(f"cluster={cluster} key={conf} conf={key}")
@@ -73,42 +71,46 @@
ceph_params = {"cluster": cluster, "conf": conf, "key": key}
- with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
- log_level=ctx.config.rpc_log_level) as node:
+ ssh_user = ctx.config.ssh_opts.get("user")
+ ssh_key = ctx.config.ssh_opts.get("key")
- try:
- ips = set()
- for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
- ip = ip_remap.get(ip, ip)
- ips.add(ip)
- creds = ConnCreds(to_ip(cast(str, ip)), user="root")
- info = ctx.merge_node(creds, {'ceph-osd'})
- info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
- assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
- info.params['ceph'] = ceph_params
- logger.debug(f"Found {len(ips)} nodes with ceph-osd role")
- except Exception as exc:
- if not ignore_errors:
- logger.exception("OSD discovery failed")
- raise StopTestError()
- else:
- logger.warning(f"OSD discovery failed {exc}")
+ node = ctx.ceph_master_node = setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
+ log_level=ctx.config.rpc_log_level,
+ sudo=ctx.config.ssh_opts.get("sudo", False))
- try:
- counter = 0
- for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
- ip = ip_remap.get(ip, ip)
- creds = ConnCreds(to_ip(cast(str, ip)), user="root")
- info = ctx.merge_node(creds, {'ceph-mon'})
- assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
- info.params['ceph'] = ceph_params
- logger.debug(f"Found {counter + 1} nodes with ceph-mon role")
- except Exception as exc:
- if not ignore_errors:
- logger.exception("MON discovery failed")
- raise StopTestError()
- else:
- logger.warning(f"MON discovery failed {exc}")
+ try:
+ ips = set()
+ for ip, osds_info in get_osds_info(node, ctx.ceph_extra_args, thcount=16).items():
+ ip = ip_remap.get(ip, ip)
+ ips.add(ip)
+ creds = ConnCreds(to_ip(cast(str, ip)), user=ssh_user, key_file=ssh_key)
+ info = ctx.merge_node(creds, {'ceph-osd'})
+ info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
+ assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
+ info.params['ceph'] = ceph_params
+ logger.debug(f"Found {len(ips)} nodes with ceph-osd role")
+ except Exception as exc:
+ if not ignore_errors:
+ logger.exception("OSD discovery failed")
+ raise StopTestError()
+ else:
+ logger.warning(f"OSD discovery failed {exc}")
+
+ try:
+ counter = 0
+ for counter, ip in enumerate(get_mons_ips(node, ctx.ceph_extra_args)):
+ ip = ip_remap.get(ip, ip)
+ creds = ConnCreds(to_ip(cast(str, ip)), user=ssh_user, key_file=ssh_key)
+ info = ctx.merge_node(creds, {'ceph-mon'})
+ assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
+ info.params['ceph'] = ceph_params
+ logger.debug(f"Found {counter + 1} nodes with ceph-mon role")
+ except Exception as exc:
+ if not ignore_errors:
+ logger.exception("MON discovery failed")
+ raise StopTestError()
+ else:
+ logger.warning(f"MON discovery failed {exc}")
def raw_dev_name(path: str) -> str:
@@ -130,12 +132,27 @@
jdevs: Set[str] = set()
sdevs: Set[str] = set()
for osd_info in node.info.params['ceph-osds']:
- for key, sset in [('journal', jdevs), ('storage', sdevs)]:
- path = osd_info.get(key)
- if path:
+
+ if osd_info['bluestore'] is None:
+ osd_stor_type_b = node.conn.fs.get_file(osd_info['storage'] + "/type", compress=False)
+ osd_stor_type = osd_stor_type_b.decode('utf8').strip()
+ osd_info['bluestore'] = osd_stor_type == 'bluestore'
+
+ if osd_info['bluestore']:
+ for name, sset in [('block.db', jdevs), ('block.wal', jdevs), ('block', sdevs)]:
+ path = f"{osd_info['storage']}/{name}"
dpath = node.conn.fs.get_dev_for_file(path)
if isinstance(dpath, bytes):
dpath = dpath.decode('utf8')
sset.add(raw_dev_name(dpath))
+ else:
+ for key, sset in [('journal', jdevs), ('storage', sdevs)]:
+ path = osd_info.get(key)
+ if path:
+ dpath = node.conn.fs.get_dev_for_file(path)
+ if isinstance(dpath, bytes):
+ dpath = dpath.decode('utf8')
+ sset.add(raw_dev_name(dpath))
+
node.info.params['ceph_storage_devs'] = list(sdevs)
node.info.params['ceph_journal_devs'] = list(jdevs)
diff --git a/wally/config.py b/wally/config.py
index 1090d6c..cbcacfa 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -10,30 +10,28 @@
def __init__(self, dct: ConfigBlock) -> None:
# make mypy happy, set fake dict
self.__dict__['_dct'] = {}
- self.run_uuid: str = None
- self.storage_url: str = None
- self.comment: str = None
- self.keep_vm: bool = None
- self.dont_discover_nodes: bool = None
- self.build_id: str = None
- self.build_description: str = None
- self.build_type: str = None
- self.default_test_local_folder: str = None
- self.settings_dir: str = None
- self.connect_timeout: int = None
+ self.run_uuid: str = None # type: ignore
+ self.storage_url: str = None # type: ignore
+ self.comment: str = None # type: ignore
+ self.keep_vm: bool = None # type: ignore
+ self.dont_discover_nodes: bool = None # type: ignore
+ self.build_id: str = None # type: ignore
+ self.build_description: str = None # type: ignore
+ self.build_type: str = None # type: ignore
+ self.default_test_local_folder: str = None # type: ignore
+ self.settings_dir: str = None # type: ignore
+ self.connect_timeout: int = None # type: ignore
self.no_tests: bool = False
self.debug_agents: bool = False
- # None, disabled, enabled, metadata, ignore_errors
- self.discover: Optional[str] = None
+ self.logging: 'Config' = None # type: ignore
+ self.ceph: 'Config' = None # type: ignore
+ self.openstack: 'Config' = None # type: ignore
+ self.test: 'Config' = None # type: ignore
+ self.sensors: 'Config' = None # type: ignore
- self.logging: 'Config' = None
- self.ceph: 'Config' = None
- self.openstack: 'Config' = None
- self.fuel: 'Config' = None
- self.test: 'Config' = None
- self.sensors: 'Config' = None
- self.discover: Set[str] = None
+ # None, disabled, enabled, metadata, ignore_errors
+ self.discover: Set[str] = None # type: ignore
self._dct.clear()
self._dct.update(dct)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 2b9037e..8d4f630 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -8,7 +8,6 @@
from cephlib.node import NodeInfo
from .result_classes import IWallyStorage
-from .suits.io.fio_hist import expected_lat_bins
logger = logging.getLogger("wally")
@@ -65,11 +64,11 @@
logger.error(msg)
raise ValueError(msg)
- if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
- msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
- f"has shape={ts.data.shape}. Can only process sensors with shape=[X, {expected_lat_bins}]."
- logger.error(msg)
- raise ValueError(msg)
+ # if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
+ # msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
+ # f"has shape={ts.data.shape}. Can only process sensors with shape=[X, {expected_lat_bins}]."
+ # logger.error(msg)
+ # raise ValueError(msg)
if metric != 'lat' and len(ts.data.shape) != 1:
msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
@@ -80,7 +79,6 @@
assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
f"[{ts.times[0]}, {ts.times[-1]}] not in [{trange[0]}, {trange[-1]}]"
-
idx1, idx2 = numpy.searchsorted(ts.times, trange)
idx2 += 1
diff --git a/wally/fuel.py b/wally/fuel.py
deleted file mode 100644
index 668be21..0000000
--- a/wally/fuel.py
+++ /dev/null
@@ -1,125 +0,0 @@
-import logging
-from typing import Dict, List, NamedTuple, Union, cast
-
-from paramiko.ssh_exception import AuthenticationException
-
-from cephlib.common import parse_creds, to_ip
-from cephlib.ssh import ConnCreds
-from cephlib.node_impl import connect, setup_rpc
-
-from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
-from .utils import StopTestError
-from .stage import Stage, StepOrder
-from .test_run_class import TestRun
-from .config import ConfigBlock
-from .openstack_api import OSCreds
-
-
-logger = logging.getLogger("wally")
-
-
-FuelNodeInfo = NamedTuple("FuelNodeInfo",
- [("version", List[int]),
- ("fuel_ext_iface", str),
- ("openrc", Dict[str, Union[str, bool]])])
-
-
-
-class DiscoverFuelStage(Stage):
- """"Fuel nodes discovery, also can get openstack openrc"""
-
- priority = StepOrder.DISCOVER
- config_block = 'fuel'
-
- @classmethod
- def validate(cls, cfg: ConfigBlock) -> None:
- # msg = "openstack_env should be provided in fuel config"
- # check_input_param('openstack_env' in fuel_data, msg)
- # fuel.openstack_env
- pass
-
- def run(self, ctx: TestRun) -> None:
- full_discovery = 'fuel' in ctx.config.discover
- metadata_only = (not full_discovery) and ('metadata' in ctx.config.discover)
- ignore_errors = 'ignore_errors' in ctx.config.discover
-
- if not (metadata_only or full_discovery):
- logger.debug("Skip ceph discovery due to config setting")
- return
-
- if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
- logger.debug("Skip FUEL credentials discovery, use previously discovered info")
- ctx.fuel_openstack_creds = OSCreds(*cast(List, ctx.storage.get('fuel_os_creds')))
- ctx.fuel_version = ctx.storage.get('fuel_version')
- if 'all_nodes' in ctx.storage:
- logger.debug("Skip FUEL nodes discovery, use data from DB")
- return
- elif metadata_only:
- logger.debug("Skip FUEL nodes discovery due to discovery settings")
- return
-
- fuel = ctx.config.fuel
- fuel_node_info = ctx.merge_node(fuel.ssh_creds, {'fuel_master'})
- creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
- fuel_conn = KeystoneAuth(fuel.url, creds)
-
- cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
- cluster = reflect_cluster(fuel_conn, cluster_id)
-
- if ctx.fuel_version is None:
- ctx.fuel_version = FuelInfo(fuel_conn).get_version()
- ctx.storage.put(ctx.fuel_version, "fuel_version")
-
- logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
- openrc = cluster.get_openrc()
-
- if openrc:
- auth_url = cast(str, openrc['os_auth_url'])
- if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
- logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
- auth_url = auth_url.replace("https", "http", 1)
-
- os_creds = OSCreds(name=cast(str, openrc['username']),
- passwd=cast(str, openrc['password']),
- tenant=cast(str, openrc['tenant_name']),
- auth_url=cast(str, auth_url),
- insecure=cast(bool, openrc['insecure']))
-
- ctx.fuel_openstack_creds = os_creds
- else:
- ctx.fuel_openstack_creds = None
-
- ctx.storage.put(list(ctx.fuel_openstack_creds), "fuel_os_creds")
-
- if metadata_only:
- logger.debug("Skip FUEL nodes discovery due to discovery settings")
- return
-
- try:
- fuel_rpc = setup_rpc(connect(fuel_node_info),
- ctx.rpc_code,
- ctx.default_rpc_plugins,
- log_level=ctx.config.rpc_log_level)
- except AuthenticationException:
- msg = "FUEL nodes discovery failed - wrong FUEL master SSH credentials"
- if ignore_errors:
- raise StopTestError(msg)
- logger.warning(msg)
- return
- except Exception as exc:
- if ignore_errors:
- logger.exception("While connection to FUEL")
- raise StopTestError("Failed to connect to FUEL")
- logger.warning("Failed to connect to FUEL - %s", exc)
- return
-
- logger.debug("Downloading FUEL node ssh master key")
- fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
- network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
-
- count = 0
- for count, fuel_node in enumerate(list(cluster.get_nodes())):
- ip = str(fuel_node.get_ip(network))
- ctx.merge_node(ConnCreds(to_ip(ip), "root", key=fuel_key), set(fuel_node.get_roles()))
-
- logger.debug("Found {} FUEL nodes for env {}".format(count, fuel.openstack_env))
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
deleted file mode 100644
index b1cfc0c..0000000
--- a/wally/fuel_rest_api.py
+++ /dev/null
@@ -1,338 +0,0 @@
-import re
-import abc
-import json
-import logging
-import urllib.parse
-import urllib.error
-import urllib.request
-from typing import Dict, Any, Iterator, List, Callable, cast
-from functools import partial
-
-import netaddr
-from keystoneclient import exceptions
-from keystoneclient.v2_0 import Client as keystoneclient
-
-
-logger = logging.getLogger("wally")
-
-
-class Connection(metaclass=abc.ABCMeta):
- host = None # type: str
-
- @abc.abstractmethod
- def do(self, method: str, path: str, params: Dict = None) -> Dict:
- pass
-
- def get(self, path: str, params: Dict = None) -> Dict:
- return self.do("GET", path, params)
-
-
-class Urllib2HTTP(Connection):
- """
- class for making HTTP requests
- """
-
- allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
-
- def __init__(self, root_url: str, headers: Dict[str, str] = None) -> None:
- """
- """
- if root_url.endswith('/'):
- self.root_url = root_url[:-1]
- else:
- self.root_url = root_url
-
- self.host = urllib.parse.urlparse(self.root_url).hostname
-
- if headers is None:
- self.headers = {} # type: Dict[str, str]
- else:
- self.headers = headers
-
- def do(self, method: str, path: str, params: Dict = None) -> Any:
- if path.startswith('/'):
- url = self.root_url + path
- else:
- url = self.root_url + '/' + path
-
- if method == 'get':
- assert params == {} or params is None
- data_json = None
- else:
- data_json = json.dumps(params)
-
- logger.debug("HTTP: {0} {1}".format(method.upper(), url))
-
- request = urllib.request.Request(url,
- data=data_json.encode("utf8"),
- headers=self.headers)
- if data_json is not None:
- request.add_header('Content-Type', 'application/json')
-
- request.get_method = lambda: method.upper() # type: ignore
- response = urllib.request.urlopen(request)
- code = response.code # type: ignore
-
- logger.debug("HTTP Responce: {0}".format(code))
- if code < 200 or code > 209:
- raise IndexError(url)
-
- content = response.read()
-
- if '' == content:
- return None
-
- return json.loads(content.decode("utf8"))
-
- def __getattr__(self, name: str) -> Any:
- if name in self.allowed_methods:
- return partial(self.do, name)
- raise AttributeError(name)
-
-
-class KeystoneAuth(Urllib2HTTP):
- def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str] = None) -> None:
- super(KeystoneAuth, self).__init__(root_url, headers)
- admin_node_ip = urllib.parse.urlparse(root_url).hostname
- self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
- self.keystone = keystoneclient(
- auth_url=self.keystone_url, **creds)
- self.refresh_token()
-
- def refresh_token(self) -> None:
- """Get new token from keystone and update headers"""
- try:
- self.keystone.authenticate()
- self.headers['X-Auth-Token'] = self.keystone.auth_token
- except exceptions.AuthorizationFailure:
- logger.warning(
- 'Cant establish connection to keystone with url %s',
- self.keystone_url)
-
- def do(self, method: str, path: str, params: Dict[str, str] = None) -> Any:
- """Do request. If gets 401 refresh token"""
- try:
- return super(KeystoneAuth, self).do(method, path, params)
- except urllib.error.HTTPError as e:
- if e.code == 401:
- logger.warning(
- 'Authorization failure: {0}'.format(e.read()))
- self.refresh_token()
- return super(KeystoneAuth, self).do(method, path, params)
- else:
- raise
-
-
-def get_inline_param_list(url: str) -> Iterator[str]:
- format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
- for match in format_param_rr.finditer(url):
- yield match.group(1)
-
-
-class RestObj:
- name = None # type: str
- id = None # type: int
-
- def __init__(self, conn: Connection, **kwargs: Any) -> None:
- self.__dict__.update(kwargs)
- self.__connection__ = conn
-
- def __str__(self) -> str:
- res = ["{0}({1}):".format(self.__class__.__name__, self.name)]
- for k, v in sorted(self.__dict__.items()):
- if k.startswith('__') or k.endswith('__'):
- continue
- if k != 'name':
- res.append(" {0}={1!r}".format(k, v))
- return "\n".join(res)
-
- def __getitem__(self, item: str) -> Any:
- return getattr(self, item)
-
-
-def make_call(method: str, url: str) -> Callable:
- def closure(obj: Any, entire_obj: Any = None, **data: Any) -> Any:
- inline_params_vals = {}
- for name in get_inline_param_list(url):
- if name in data:
- inline_params_vals[name] = data[name]
- del data[name]
- else:
- inline_params_vals[name] = getattr(obj, name)
- result_url = url.format(**inline_params_vals)
-
- if entire_obj is not None:
- if data != {}:
- raise ValueError("Both entire_obj and data provided")
- data = entire_obj
- return obj.__connection__.do(method, result_url, params=data)
- return closure
-
-
-RequestMethod = Callable[[str], Callable]
-
-
-PUT = cast(RequestMethod, partial(make_call, 'put')) # type: RequestMethod
-GET = cast(RequestMethod, partial(make_call, 'get')) # type: RequestMethod
-DELETE = cast(RequestMethod, partial(make_call, 'delete')) # type: RequestMethod
-
-# ------------------------------- ORM ----------------------------------------
-
-
-def get_fuel_info(url: str) -> 'FuelInfo':
- conn = Urllib2HTTP(url)
- return FuelInfo(conn)
-
-
-class FuelInfo(RestObj):
-
- """Class represents Fuel installation info"""
-
- get_nodes = GET('api/nodes')
- get_clusters = GET('api/clusters')
- get_cluster = GET('api/clusters/{id}')
- get_info = GET('api/releases')
-
- @property
- def nodes(self) -> 'NodeList':
- """Get all fuel nodes"""
- return NodeList([Node(self.__connection__, **node) for node
- in self.get_nodes()])
-
- @property
- def free_nodes(self) -> 'NodeList':
- """Get unallocated nodes"""
- return NodeList([Node(self.__connection__, **node) for node in
- self.get_nodes() if not node['cluster']])
-
- @property
- def clusters(self) -> List['Cluster']:
- """List clusters in fuel"""
- return [Cluster(self.__connection__, **cluster) for cluster
- in self.get_clusters()]
-
- def get_version(self) -> List[int]:
- for info in self.get_info():
- vers = info['version'].split("-")[1].split('.')
- return list(map(int, vers))
- raise ValueError("No version found")
-
-
-class Node(RestObj):
- """Represents node in Fuel"""
-
- get_info = GET('/api/nodes/{id}')
- get_interfaces = GET('/api/nodes/{id}/interfaces')
-
- def get_network_data(self) -> Dict:
- """Returns node network data"""
- return self.get_info().get('network_data')
-
- def get_roles(self) -> List[str]:
- """Get node roles
-
- Returns: (roles, pending_roles)
- """
- return self.get_info().get('roles')
-
- def get_ip(self, network='public') -> netaddr.IPAddress:
- """Get node ip
-
- :param network: network to pick
- """
- nets = self.get_network_data()
- for net in nets:
- if net['name'] == network:
- iface_name = net['dev']
- for iface in self.get_info()['meta']['interfaces']:
- if iface['name'] == iface_name:
- try:
- return iface['ip']
- except KeyError:
- return netaddr.IPNetwork(net['ip']).ip
- raise Exception('Network %s not found' % network)
-
-
-class NodeList(list):
- """Class for filtering nodes through attributes"""
- allowed_roles = ['controller', 'compute', 'cinder', 'ceph-osd', 'mongo',
- 'zabbix-server']
-
- def __getattr__(self, name: str) -> List[Node]:
- if name in self.allowed_roles:
- return [node for node in self if name in node.roles]
- return []
-
-
-class Cluster(RestObj):
- """Class represents Cluster in Fuel"""
-
- get_status = GET('api/clusters/{id}')
- get_networks = GET('api/clusters/{id}/network_configuration/neutron')
- get_attributes = GET('api/clusters/{id}/attributes')
- _get_nodes = GET('api/nodes?cluster_id={id}')
-
- def __init__(self, *dt, **mp) -> None:
- super(Cluster, self).__init__(*dt, **mp)
- self.nodes = NodeList([Node(self.__connection__, **node) for node in
- self._get_nodes()])
-
- def check_exists(self) -> bool:
- """Check if cluster exists"""
- try:
- self.get_status()
- return True
- except urllib.error.HTTPError as err:
- if err.code == 404:
- return False
- raise
-
- def get_openrc(self) -> Dict[str, str]:
- access = self.get_attributes()['editable']['access']
- creds = {'username': access['user']['value'],
- 'password': access['password']['value'],
- 'tenant_name': access['tenant']['value']}
-
- version = FuelInfo(self.__connection__).get_version()
- # only HTTPS since 7.0
- if version >= [7, 0]:
- creds['insecure'] = "True"
- creds['os_auth_url'] = "https://{0}:5000/v2.0".format(
- self.get_networks()['public_vip'])
- else:
- creds['os_auth_url'] = "http://{0}:5000/v2.0".format(
- self.get_networks()['public_vip'])
- return creds
-
- def get_nodes(self) -> Iterator[Node]:
- for node_descr in self._get_nodes():
- yield Node(self.__connection__, **node_descr)
-
-
-def reflect_cluster(conn: Connection, cluster_id: int) -> Cluster:
- """Returns cluster object by id"""
- c = Cluster(conn, id=cluster_id)
- c.nodes = NodeList(list(c.get_nodes()))
- return c
-
-
-def get_all_nodes(conn: Connection) -> Iterator[Node]:
- """Get all nodes from Fuel"""
- for node_desc in conn.get('api/nodes'):
- yield Node(conn, **node_desc)
-
-
-def get_all_clusters(conn: Connection) -> Iterator[Cluster]:
- """Get all clusters"""
- for cluster_desc in conn.get('api/clusters'):
- yield Cluster(conn, **cluster_desc)
-
-
-def get_cluster_id(conn: Connection, name: str) -> int:
- """Get cluster id by name"""
- for cluster in get_all_clusters(conn):
- if cluster.name == name:
- return cluster.id
-
- raise ValueError("Cluster {0} not found".format(name))
-
diff --git a/wally/main.py b/wally/main.py
index 77074c6..36d40f0 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -12,7 +12,7 @@
from yaml import load as _yaml_load
YLoader = Callable[[IO], Any]
-yaml_load = None # type: YLoader
+yaml_load: YLoader = None # type: ignore
try:
from yaml import CLoader
@@ -44,7 +44,6 @@
# stages
from .ceph import DiscoverCephStage, CollectCephInfoStage
from .openstack import DiscoverOSStage
-from .fuel import DiscoverFuelStage
from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
RunTestsStage, ConnectStage, SleepStage, PrepareNodes,
LoadStoredNodesStage)
@@ -141,7 +140,6 @@
parser.add_argument("--profile", action="store_true", help="Profile execution")
parser.add_argument("-s", '--settings-dir', default=None,
help="Folder to store key/settings/history files")
-
subparsers = parser.add_subparsers(dest='subparser_name')
# ---------------------------------------------------------------------
@@ -252,7 +250,6 @@
return [DiscoverCephStage(),
CollectCephInfoStage(),
DiscoverOSStage(),
- DiscoverFuelStage(),
ExplicitNodesStage(),
StartSensorsStage(),
RunTestsStage(),
@@ -267,16 +264,16 @@
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
- stages = [] # type: List[Stage]
+ stages: List[Stage] = []
# stop mypy from telling that config & storage might be undeclared
- config = None # type: Config
- storage = None # type: IStorage
+ config: Config = None # type: ignore
+ storage: IStorage = None # type: ignore
if opts.profile:
import cProfile
- pr = cProfile.Profile()
- pr.enable()
+ pr: Optional[cProfile.Profile] = cProfile.Profile()
+ pr.enable() # type: ignore
else:
pr = None
@@ -464,7 +461,7 @@
if opts.profile:
assert pr is not None
- pr.disable()
+ pr.disable() # type: ignore
import pstats
pstats.Stats(pr).sort_stats('tottime').print_stats(30)
diff --git a/wally/meta_info.py b/wally/meta_info.py
deleted file mode 100644
index 7dc3901..0000000
--- a/wally/meta_info.py
+++ /dev/null
@@ -1,55 +0,0 @@
-from typing import Any, Dict, List
-from .fuel_rest_api import KeystoneAuth, FuelInfo
-
-
-def total_lab_info(nodes: List[Dict[str, Any]]) -> Dict[str, int]:
- lab_data = {'nodes_count': len(nodes),
- 'total_memory': 0,
- 'total_disk': 0,
- 'processor_count': 0} # type: Dict[str, int]
-
- for node in nodes:
- lab_data['total_memory'] += node['memory']['total']
- lab_data['processor_count'] += len(node['processors'])
-
- for disk in node['disks']:
- lab_data['total_disk'] += disk['size']
-
- lab_data['total_memory'] /= (1024 ** 3)
- lab_data['total_disk'] /= (1024 ** 3)
-
- return lab_data
-
-
-def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Any]:
- finfo = FuelInfo(KeystoneAuth(url, cred))
-
- nodes = [] # type: List[Dict[str, str]]
- result = {} # type: Dict[str, Any]
-
- for node in finfo.get_nodes(): # type: ignore
- node_info = {
- 'name': node['name'],
- 'processors': [],
- 'interfaces': [],
- 'disks': [],
- 'devices': [],
- 'memory': node['meta']['memory'].copy()
- }
-
- for processor in node['meta']['cpu']['spec']:
- node_info['processors'].append(processor)
-
- for iface in node['meta']['interfaces']:
- node_info['interfaces'].append(iface)
-
- for disk in node['meta']['disks']:
- node_info['disks'].append(disk)
-
- nodes.append(node_info)
-
- result['nodes'] = nodes
- result['fuel_version'] = finfo.get_version()
- result['total_info'] = total_lab_info(nodes)
-
- return result
diff --git a/wally/openstack.py b/wally/openstack.py
index 575edc7..1e02653 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -8,8 +8,8 @@
from cephlib.ssh import ConnCreds
from .config import ConfigBlock, Config
-from .openstack_api import (os_connect, find_vms,
- OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
+from .openstack_api import (os_connect, find_vms, OSConnection,
+ OSCreds, get_openstack_credentials_from_env, prepare_os, launch_vms, clear_nodes)
from .test_run_class import TestRun
from .stage import Stage, StepOrder
from .utils import LogError, StopTestError, get_creds_openrc
@@ -41,8 +41,8 @@
if stored is not None:
return OSCreds(*cast(List, stored))
- creds = None # type: OSCreds
- os_creds = None # type: OSCreds
+ creds: OSCreds = None # type: ignore
+ os_creds: OSCreds = None # type: ignore
force_insecure = False
cfg = ctx.config
@@ -53,7 +53,7 @@
if isinstance(sett, str):
if 'ENV' == sett:
logger.info("Using OS credentials from shell environment")
- os_creds = get_openstack_credentials()
+ os_creds = get_openstack_credentials_from_env()
else:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
creds_tuple = get_creds_openrc(sett)
@@ -69,11 +69,7 @@
if 'insecure' in os_cfg:
force_insecure = os_cfg.get('insecure', False)
- if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
- ctx.fuel_openstack_creds is not None:
- logger.info("Using fuel creds")
- creds = ctx.fuel_openstack_creds
- elif os_creds is None:
+ if os_creds is None:
logger.error("Can't found OS credentials")
raise StopTestError("Can't found OS credentials", None)
@@ -102,8 +98,7 @@
config_block = 'openstack'
- # discover FUEL cluster first
- priority = StepOrder.DISCOVER + 1
+ priority = StepOrder.DISCOVER
@classmethod
def validate(cls, conf: ConfigBlock) -> None:
@@ -120,6 +115,8 @@
ensure_connected_to_openstack(ctx)
+ os_conn: OSConnection = ctx.os_connection # type: ignore # remove Optional[]
+
cfg = ctx.config.openstack
os_nodes_auth = cfg.auth # type: str
if os_nodes_auth.count(":") == 2:
@@ -131,8 +128,8 @@
key_file = None
if 'metadata' not in ctx.config.discover:
- services = ctx.os_connection.nova.services.list() # type: List[Any]
- host_services_mapping = {} # type: Dict[str, List[str]]
+ services: List[Any] = os_conn.nova.services.list()
+ host_services_mapping: Dict[str, List[str]] = {}
for service in services:
ip = cast(str, socket.gethostbyname(service.host))
@@ -152,9 +149,8 @@
private_key_path = get_vm_keypair_path(ctx.config)[0]
- vm_creds = None # type: str
for vm_creds in cfg.get("vms", []):
- user_name, vm_name_pattern = vm_creds.split("@", 1)
+ user_name, vm_name_pattern = vm_creds.split("@", 1) # type: ignore
msg = "Vm like {} lookup failed".format(vm_name_pattern)
with LogError(msg):
@@ -163,7 +159,7 @@
ensure_connected_to_openstack(ctx)
- for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+ for ip, vm_id in find_vms(os_conn, vm_name_pattern): # type: ignore
creds = ConnCreds(host=to_ip(ip), user=user_name, key_file=private_key_path)
info = NodeInfo(creds, {'testnode'})
info.os_vm_id = vm_id
@@ -195,6 +191,8 @@
vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
ensure_connected_to_openstack(ctx)
+ os_conn: OSConnection = ctx.os_connection # type: ignore # remove Optional[]
+
params = vm_image_config.copy()
params.update(vm_spawn_config)
params.update(get_vm_keypair_path(ctx.config))
@@ -203,13 +201,13 @@
if not ctx.config.openstack.get("skip_preparation", False):
logger.info("Preparing openstack")
- prepare_os(ctx.os_connection, params)
+ prepare_os(os_conn, params)
else:
logger.info("Scip openstack preparation as 'skip_preparation' is set")
ctx.os_spawned_nodes_ids = []
with ctx.get_pool() as pool:
- for info in launch_vms(ctx.os_connection, params, pool):
+ for info in launch_vms(os_conn, params, pool):
info.roles.add('testnode')
nid = info.node_id
if nid in ctx.nodes_info:
@@ -225,7 +223,7 @@
if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
logger.info("Removing nodes")
- clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
+ clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids) # type: ignore
ctx.storage.rm('spawned_os_nodes')
logger.info("OS spawned nodes has been successfully removed")
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 257a7de..b1d9172 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -7,8 +7,8 @@
import tempfile
import subprocess
import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
-from concurrent.futures import ThreadPoolExecutor
+from typing import Dict, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Any
+from concurrent.futures import ThreadPoolExecutor, Future
from keystoneauth1 import loading, session
from novaclient.exceptions import NotFound
@@ -45,15 +45,18 @@
("insecure", bool)])
-# TODO(koder): should correctly process different sources, not only env????
-def get_openstack_credentials() -> OSCreds:
+def get_openstack_credentials_from_env() -> OSCreds:
is_insecure = os.environ.get('OS_INSECURE', 'false').lower() in ('true', 'yes')
-
- return OSCreds(os.environ.get('OS_USERNAME'),
- os.environ.get('OS_PASSWORD'),
- os.environ.get('OS_TENANT_NAME'),
- os.environ.get('OS_AUTH_URL'),
- is_insecure)
+ try:
+ return OSCreds(os.environ['OS_USERNAME'],
+ os.environ['OS_PASSWORD'],
+ os.environ['OS_TENANT_NAME'],
+ os.environ['OS_AUTH_URL'],
+ is_insecure)
+ except KeyError:
+ logger.error("One of openstack enviroment variable is not defined - check for " +
+ "OS_USERNAME, OS_PASSWORD, OS_TENANT_NAME, OS_AUTH_URL")
+ raise
class OSConnection:
@@ -100,7 +103,7 @@
vm.pause()
for future in executor.map(pause_vm, ids):
- future.result()
+ future.result() # type: ignore
def unpause(conn: OSConnection, ids: Iterable[int], executor: ThreadPoolExecutor, max_resume_time=10) -> None:
@@ -116,7 +119,7 @@
raise RuntimeError("Can't unpause vm {0}".format(vm_id))
for future in executor.map(unpause, ids):
- future.result()
+ future.result() # type: ignore
def prepare_os(conn: OSConnection, params: Dict[str, Any], max_vm_per_node: int = 8) -> None:
@@ -493,8 +496,8 @@
sec_group_size: int = None) -> List[Tuple[str, Any]]:
if network_zone_name is not None:
- network_future = executor.submit(conn.nova.networks.find,
- label=network_zone_name)
+ network_future: Optional[Future] = executor.submit(conn.nova.networks.find,
+ label=network_zone_name)
else:
network_future = None
@@ -505,7 +508,7 @@
ips_future = executor.submit(get_floating_ips,
conn, flt_ip_pool, amount)
logger.debug("Wait for floating ip")
- ips = ips_future.result()
+ ips: List[Any] = ips_future.result()
ips += [Allocate] * (amount - len(ips))
else:
ips = [None] * amount
@@ -517,7 +520,7 @@
if network_future is not None:
logger.debug("Waiting for network results")
- nics = [{'net-id': network_future.result().id}]
+ nics: Any = [{'net-id': network_future.result().id}]
else:
nics = None
@@ -528,27 +531,28 @@
futures = []
logger.debug("Requesting new vm's")
- orig_scheduler_hints = scheduler_hints.copy()
- group_name_template = scheduler_hints['group'].format("\\d+")
- groups = list(get_free_server_groups(conn, group_name_template + "$"))
- groups.sort()
+ if scheduler_hints:
+ orig_scheduler_hints = scheduler_hints.copy() # type: ignore
+ group_name_template = scheduler_hints['group'].format("\\d+")
+ groups = list(get_free_server_groups(conn, group_name_template + "$"))
+ groups.sort()
- for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
+ for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
- scheduler_hints = None
- if orig_scheduler_hints is not None and sec_group_size is not None:
- if "group" in orig_scheduler_hints:
+ scheduler_hints = None
+ if orig_scheduler_hints is not None and sec_group_size is not None:
+ if "group" in orig_scheduler_hints:
+ scheduler_hints = orig_scheduler_hints.copy()
+ scheduler_hints['group'] = groups[idx // sec_group_size]
+
+ if scheduler_hints is None:
scheduler_hints = orig_scheduler_hints.copy()
- scheduler_hints['group'] = groups[idx // sec_group_size]
- if scheduler_hints is None:
- scheduler_hints = orig_scheduler_hints.copy()
+ params = (conn, name, keypair_name, img, fl,
+ nics, vol_sz, flt_ip, scheduler_hints,
+ flt_ip_pool, [security_group])
- params = (conn, name, keypair_name, img, fl,
- nics, vol_sz, flt_ip, scheduler_hints,
- flt_ip_pool, [security_group])
-
- futures.append(executor.submit(create_vm, *params))
+ futures.append(executor.submit(create_vm, *params))
res = [future.result() for future in futures]
logger.debug("Done spawning")
return res
@@ -569,7 +573,7 @@
delete_timeout: int = 120) -> Tuple[str, Any]:
# make mypy/pylint happy
- srv = None # type: Any
+ srv: Any = None
for i in range(max_retry):
srv = conn.nova.servers.create(name, flavor=flavor, image=img, nics=nics, key_name=keypair_name,
scheduler_hints=scheduler_hints, security_groups=security_groups)
@@ -614,11 +618,12 @@
return srv.id in ids
volumes_to_delete = []
- for vol in conn.cinder.volumes.list():
- for attachment in vol.attachments:
- if attachment['server_id'] in ids:
- volumes_to_delete.append(vol)
- break
+ if ids:
+ for vol in conn.cinder.volumes.list():
+ for attachment in vol.attachments:
+ if attachment['server_id'] in ids:
+ volumes_to_delete.append(vol)
+ break
still_alive = set()
for srv in conn.nova.servers.list():
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index 7cd0f3a..05dfe59 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,7 +1,7 @@
__doc__ = "functions for make pretty yaml files"
__all__ = ['dumps']
-from typing import Any, Iterable, List
+from typing import Any, Iterable, List, Optional
def dumps_simple(val: Any) -> str:
@@ -49,10 +49,8 @@
if isinstance(data, (list, tuple)):
if all(map(is_simple, data)):
- if all_nums(data):
- one_line = "[{0}]".format(", ".join(map(dumps_simple, data)))
- else:
- one_line = "[{0}]".format(",".join(map(dumps_simple, data)))
+ join_str = ", " if all_nums(data) else ","
+ one_line: Optional[str] = "[" + join_str.join(map(dumps_simple, data)) + "]"
elif len(data) == 0:
one_line = "[]"
else:
@@ -76,9 +74,7 @@
one_line = None
if all(map(is_simple, data.values())):
- one_line = ", ".join(
- "{0}: {1}".format(dumps_simple(k), dumps_simple(v))
- for k, v in sorted(data.items()))
+ one_line = ", ".join(f"{dumps_simple(k)}: {dumps_simple(v)}" for k, v in sorted(data.items()))
one_line = "{" + one_line + "}"
if len(one_line) > width:
one_line = None
diff --git a/wally/report.py b/wally/report.py
index 092f8d8..d6a42ee 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -171,20 +171,20 @@
NO_VAL = -1
def __init__(self) -> None:
- self.rw_iops_10ms = self.NO_VAL # type: int
- self.rw_iops_30ms = self.NO_VAL # type: int
- self.rw_iops_100ms = self.NO_VAL # type: int
+ self.rw_iops_10ms = self.NO_VAL
+ self.rw_iops_30ms = self.NO_VAL
+ self.rw_iops_100ms = self.NO_VAL
- self.rr_iops_10ms = self.NO_VAL # type: int
- self.rr_iops_30ms = self.NO_VAL # type: int
- self.rr_iops_100ms = self.NO_VAL # type: int
+ self.rr_iops_10ms = self.NO_VAL
+ self.rr_iops_30ms = self.NO_VAL
+ self.rr_iops_100ms = self.NO_VAL
- self.bw_write_max = self.NO_VAL # type: int
- self.bw_read_max = self.NO_VAL # type: int
+ self.bw_write_max = self.NO_VAL
+ self.bw_read_max = self.NO_VAL
- self.bw = None # type: Optional[float]
- self.read_iops = None # type: Optional[float]
- self.write_iops = None # type: Optional[float]
+ self.bw: Optional[float] = None
+ self.read_iops: Optional[float] = None
+ self.write_iops: Optional[float] = None
def get_performance_summary(storage: IWallyStorage, suite: SuiteConfig,
@@ -240,7 +240,7 @@
headers = ["Mode", "Stats", "Explanation"]
align = ['left', 'right', "left"]
- data = []
+ data: List[Union[str, Tuple[str, str, str]]] = []
if psum95.rr_iops_10ms != psum95.NO_VAL or psum95.rr_iops_30ms != psum95.NO_VAL or \
psum95.rr_iops_100ms != psum95.NO_VAL:
@@ -274,8 +274,9 @@
data.append(("Read", b2ssize(psum50.bw_read_max) + psum50.bw_units,
"Large blocks (>={}KiB)".format(self.style.large_blocks)))
- res += html.center(html.table("Performance", headers, data, align=align))
- yield Menu1st.summary, Menu2ndSumm.summary, HTMLBlock(res)
+ if data:
+ res += html.center(html.table("Performance", headers, data, align=align))
+ yield Menu1st.summary, Menu2ndSumm.summary, HTMLBlock(res)
# # Node load over test time
@@ -292,8 +293,8 @@
suite_types = {'fio'}
def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- ts_map = defaultdict(list) # type: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]]
- str_summary = {} # type: Dict[FioJobParams, Tuple[str, str]]
+ ts_map: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]] = defaultdict(list)
+ str_summary: Dict[FioJobParams, Tuple[str, str]] = {}
for job in self.rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
@@ -326,7 +327,7 @@
suite_types = {'fio'}
def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- qd_grouped_jobs = {} # type: Dict[FioJobParams, List[FioJobConfig]]
+ qd_grouped_jobs: Dict[FioJobParams, List[FioJobConfig]] = {}
test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
for job in self.rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
@@ -350,7 +351,8 @@
if len(cpu_usage2qd) < StyleProfile.min_iops_vs_qd_jobs:
continue
- labels, vals, errs = zip(*((l, avg, dev) for l, (_, avg, dev) in sorted(cpu_usage2qd.items())))
+ labels, vals, errs = zip(*((l, avg, dev)
+ for l, (_, avg, dev) in sorted(cpu_usage2qd.items()))) # type: ignore
if test_nc == 1:
labels = list(map(str, labels))
@@ -734,7 +736,6 @@
ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
bn += (ts.data > bn_val).sum()
tot += len(ts.data)
- print(node_id, bn, tot)
yield Menu1st.per_job, job.summary, HTMLBlock("")
@@ -761,23 +762,6 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
-class DevRoles:
- client_disk = 'client_disk'
- client_net = 'client_net'
- client_cpu = 'client_cpu'
-
- storage_disk = 'storage_disk'
- storage_client_net = 'storage_client_net'
- storage_replication_net = 'storage_replication_net'
- storage_cpu = 'storage_disk'
- ceph_storage = 'ceph_storage'
- ceph_journal = 'ceph_journal'
-
- compute_disk = 'compute_disk'
- compute_net = 'compute_net'
- compute_cpu = 'compute_cpu'
-
-
def roles_for_sensors(storage: IWallyStorage) -> Dict[str, List[DataSource]]:
role2ds = defaultdict(list)
@@ -785,19 +769,19 @@
ds = DataSource(node_id=node.node_id)
if 'ceph-osd' in node.roles:
for jdev in node.params.get('ceph_journal_devs', []):
- role2ds[DevRoles.ceph_journal].append(ds(dev=jdev))
- role2ds[DevRoles.storage_disk].append(ds(dev=jdev))
+ role2ds[DevRoles.osd_journal].append(ds(dev=jdev))
+ role2ds[DevRoles.storage_block].append(ds(dev=jdev))
for sdev in node.params.get('ceph_storage_devs', []):
- role2ds[DevRoles.ceph_storage].append(ds(dev=sdev))
- role2ds[DevRoles.storage_disk].append(ds(dev=sdev))
+ role2ds[DevRoles.osd_storage].append(ds(dev=sdev))
+ role2ds[DevRoles.storage_block].append(ds(dev=sdev))
if node.hw_info:
for dev in node.hw_info.disks_info:
- role2ds[DevRoles.storage_disk].append(ds(dev=dev))
+ role2ds[DevRoles.storage_block].append(ds(dev=dev))
if 'testnode' in node.roles:
- role2ds[DevRoles.client_disk].append(ds(dev='rbd0'))
+ role2ds[DevRoles.client_block].append(ds(dev='rbd0'))
return role2ds
@@ -812,7 +796,7 @@
trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
- for dev_role in (DevRoles.ceph_storage, DevRoles.ceph_journal, DevRoles.client_disk):
+ for dev_role in (DevRoles.osd_storage, DevRoles.osd_journal, DevRoles.client_block):
caption = "{} IO heatmaps - {}".format(dev_role.capitalize(), cast(FioJobParams, job).params.long_summary)
if test_nc != 1:
diff --git a/wally/resources.py b/wally/resources.py
index 4074efb..2d563b9 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -1,5 +1,5 @@
import logging
-from typing import Tuple, Dict, cast, List
+from typing import Tuple, Dict, cast, List, Optional, Union
import numpy
@@ -148,15 +148,13 @@
rstorage: IWallyStorage,
large_block: int = 256,
hist_boxes: int = 10,
- nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
+ nc: bool = False) -> Tuple[Dict[str, Tuple[str, Optional[float], Optional[float]]], bool]:
- records = {} # type: Dict[str, Tuple[str, float, float]]
if not nc:
- records = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
- if records is not None:
- records = records.copy()
- iops_ok = records.pop('iops_ok')
- return records, iops_ok
+ jinfo = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
+ if jinfo is not None:
+ jinfo = jinfo.copy()
+ return jinfo, jinfo.pop('iops_ok') # type: ignore
fjob = cast(FioJobConfig, job)
iops_ok = fjob.bsize < large_block
@@ -166,7 +164,7 @@
tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
io_transfered = io_sum.bw.data * tot_io_coef
- records = {
+ records: Dict[str, Tuple[str, Optional[float], Optional[float]]] = {
ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
}
@@ -215,7 +213,7 @@
avg, dev = avg_dev_div(data, service_provided_count)
if avg < 0.1:
- dev = None
+ dev = None # type: ignore
records[vname] = (ffunc(data.sum()) + units, avg, dev)
all_agg[vname] = data
@@ -266,7 +264,7 @@
agg = all_agg[name1] + all_agg[name2]
avg, dev = avg_dev_div(agg, service_provided_masked)
if avg < 0.1:
- dev = None
+ dev = None # type: ignore
records[vname] = (ffunc(agg.sum()) + units, avg, dev)
if not nc:
@@ -276,7 +274,7 @@
records[name] = v1, toflt(v2), toflt(v3)
srecords = records.copy()
- srecords['iops_ok'] = iops_ok
+ srecords['iops_ok'] = iops_ok # type: ignore
rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
return records, iops_ok
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 9d59d42..f2b84e6 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -59,20 +59,6 @@
JobStatMetrics = Dict[Tuple[str, str, str], StatProps]
-class JobResult:
- """Contains done test job information"""
-
- def __init__(self,
- info: JobConfig,
- begin_time: int,
- end_time: int,
- raw: JobMetrics) -> None:
- self.info = info
- self.run_interval = (begin_time, end_time)
- self.raw = raw # type: JobMetrics
- self.processed = None # type: JobStatMetrics
-
-
class IWallyStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
@abc.abstractmethod
diff --git a/wally/result_storage.py b/wally/result_storage.py
index de2f86d..88b8323 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -2,7 +2,7 @@
import json
import pprint
import logging
-from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List
+from typing import cast, Iterator, Type, Optional, Any, List
import numpy
@@ -17,7 +17,7 @@
from .utils import StopTestError
from .suits.all_suits import all_suits
-from cephlib.storage import Storage
+from cephlib.storage import IStorage
logger = logging.getLogger('wally')
@@ -30,7 +30,7 @@
class WallyStorage(IWallyStorage, SensorStorage):
- def __init__(self, storage: Storage) -> None:
+ def __init__(self, storage: IStorage) -> None:
SensorStorage.__init__(self, storage, WallyDB)
def flush(self) -> None:
diff --git a/wally/run_test.py b/wally/run_test.py
index 578a65b..9757322 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -37,7 +37,8 @@
return True, setup_rpc(ssh_node,
ctx.rpc_code,
ctx.default_rpc_plugins,
- log_level=ctx.config.rpc_log_level)
+ log_level=ctx.config.rpc_log_level,
+ sudo=True)
except Exception as exc:
logger.exception("During connect to %s: %s", node_info, exc)
return False, node_info
@@ -100,9 +101,11 @@
for node in ctx.nodes:
node.conn.cli.killall()
+ if ctx.ceph_master_node:
+ ctx.ceph_master_node.conn.cli.killall()
+
logger.info("Downloading RPC servers logs")
for node in ctx.nodes:
- node.conn.cli.killall()
if node.rpc_log_file is not None:
nid = node.node_id
path = WallyDB.rpc_logs.format(node_id=nid)
@@ -116,7 +119,8 @@
logger.info("Disconnecting")
with ctx.get_pool() as pool:
- list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
+ list(pool.map(lambda node: node.disconnect(stop=True),
+ ctx.nodes + ([ctx.ceph_master_node] if ctx.ceph_master_node else [])))
class CollectInfoStage(Stage):
@@ -268,7 +272,7 @@
test_cls(storage=ctx.rstorage,
suite=suite,
- on_idle=lambda: collect_sensors_data(ctx, False)).run()
+ on_tests_boundry=lambda before_test: collect_sensors_data(ctx, False, before_test)).run()
@classmethod
def validate_config(cls, cfg: ConfigBlock) -> None:
diff --git a/wally/sensors.py b/wally/sensors.py
index 9fb2177..0aca82e 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,12 +1,14 @@
import bz2
+import time
import array
import logging
-from typing import Dict
+from typing import Dict, Tuple, Optional, Any
import numpy
from cephlib import sensors_rpc_plugin
from cephlib.units import b2ssize
+from cephlib.wally_storage import WallyDB
from . import utils
from .test_run_class import TestRun
@@ -85,37 +87,75 @@
logger.debug("Skip monitoring node %s, as no sensors selected", nid)
-def collect_sensors_data(ctx: TestRun, stop: bool = False):
+def collect_sensors_data(ctx: TestRun,
+ stop: bool = False,
+ before_test: bool = False):
total_sz = 0
- logger.info("Start loading sensors")
- for node in ctx.nodes:
- node_id = node.node_id
- if node_id in ctx.sensors_run_on:
- func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
+ # ceph pg and pool data collected separatelly
+ cluster_metrics = getattr(ctx.config.sensors, 'cluster', [])
- # hack to calculate total transferred size
- offset_map, compressed_blob, compressed_collected_at_b = func()
- data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
+ pgs_io = 'ceph-pgs-io' in cluster_metrics
+ pools_io = 'ceph-pools-io' in cluster_metrics
- total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
- 16 * len(offset_map)
+ if pgs_io or pools_io:
+ assert ctx.ceph_master_node is not None
- for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
- if path == 'collected_at':
- ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
- ctx.rstorage.append_sensor(numpy.array(value), ds, units)
- else:
- sensor, dev, metric = path.split(".")
- ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
- if is_array:
+ def collect() -> Tuple[Optional[Any], Optional[Any]]:
+ pg_dump = ctx.ceph_master_node.run(f"ceph {ctx.ceph_extra_args} pg dump --format json") if pgs_io else None
+ pools_dump = ctx.ceph_master_node.run(f"rados {ctx.ceph_extra_args} df --format json") if pools_io else None
+ return pg_dump, pools_dump
+ future = ctx.get_pool().submit(collect)
+ else:
+ future = None
+
+ ctime = int(time.time())
+
+ if not before_test:
+ logger.info("Start loading sensors")
+ for node in ctx.nodes:
+ node_id = node.node_id
+ if node_id in ctx.sensors_run_on:
+ func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
+
+ # hack to calculate total transferred size
+ offset_map, compressed_blob, compressed_collected_at_b = func()
+ data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
+
+ total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
+ 16 * len(offset_map)
+
+ for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
+ if path == 'collected_at':
+ ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
ctx.rstorage.append_sensor(numpy.array(value), ds, units)
else:
- if metric == 'historic':
- ctx.rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
+ sensor, dev, metric = path.split(".")
+ ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
+ if is_array:
+ ctx.rstorage.append_sensor(numpy.array(value), ds, units)
else:
- assert metric in ('perf_dump', 'historic_js')
- ctx.rstorage.put_sensor_raw(value, ds(tag='js'))
+ if metric == 'historic':
+ value = bz2.compress(value)
+ tag = 'bz2'
+ else:
+ assert metric == 'perf_dump'
+ tag = 'txt'
+ ctx.storage.put_raw(value, WallyDB.ceph_metric(node_id=node_id,
+ metric=metric,
+ time=ctime,
+ tag=tag))
+
+ if future:
+ pgs_info, pools_info = future.result()
+ if pgs_info:
+ total_sz += len(pgs_info)
+ ctx.storage.put_raw(bz2.compress(pgs_info.encode('utf8')), WallyDB.pgs_io.format(time=ctime))
+
+ if pools_info:
+ total_sz += len(pools_info)
+ ctx.storage.put_raw(bz2.compress(pools_info.encode('utf8')), WallyDB.pools_io.format(time=ctime))
+
logger.info("Download %sB of sensors data", b2ssize(total_sz))
@@ -125,5 +165,5 @@
config_block = 'sensors'
def run(self, ctx: TestRun) -> None:
- collect_sensors_data(ctx, True)
+ collect_sensors_data(ctx, True, False)
diff --git a/wally/stage.py b/wally/stage.py
index 14e80ae..92cd557 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -17,8 +17,8 @@
class Stage(metaclass=abc.ABCMeta):
- priority = None # type: int
- config_block = None # type: Optional[str]
+ priority: int = None # type: ignore
+ config_block: Optional[str] = None
@classmethod
def name(cls) -> str:
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index c3dee19..ee10055 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -21,5 +21,6 @@
write_bw_log=fio_bw_log
log_avg_msec=1000
write_hist_log=fio_lat_hist_log
+log_hist_coarseness=0
log_hist_msec=1000
log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 374e5a4..693a547 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,6 +1,6 @@
import os.path
import logging
-from typing import cast, Any, List, Union, Tuple, Optional
+from typing import cast, Any, List, Union
import numpy
@@ -14,7 +14,7 @@
from ..job import JobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
-from .fio_hist import get_lat_vals, expected_lat_bins
+from .fio_hist import get_lat_vals
logger = logging.getLogger("wally")
@@ -209,7 +209,7 @@
raise StopTestError()
# TODO: fix units, need to get array type from stream
-
+ open("/tmp/tt", 'wb').write(raw_result)
parsed = [] # type: List[Union[List[int], int]]
times = []
@@ -223,11 +223,11 @@
if name == 'lat':
vals = [int(i.strip()) for i in rest]
- if len(vals) != expected_lat_bins:
- msg = "Expect {} bins in latency histogram, but found {} at time {}" \
- .format(expected_lat_bins, len(vals), time_ms_s)
- logger.error(msg)
- raise StopTestError(msg)
+ # if len(vals) != expected_lat_bins:
+ # msg = f"Expect {expected_lat_bins} bins in latency histogram, " + \
+ # f"but found {len(vals)} at time {time_ms_s}"
+ # logger.error(msg)
+ # raise StopTestError(msg)
parsed.append(vals)
else:
@@ -238,7 +238,7 @@
assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
- histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+ histo_bins = None if name != 'lat' else numpy.array(get_lat_vals(len(parsed[0])))
ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
units=units,
times=numpy.array(times, dtype='uint64'),
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
index a2ded70..fc32d0d 100644
--- a/wally/suits/io/fio_hist.py
+++ b/wally/suits/io/fio_hist.py
@@ -1,9 +1,6 @@
from typing import List
-expected_lat_bins = 1216
-
-
#---------------------------- FIO HIST LOG PARSE CODE -----------------------------------------------------------------
# Copy-paste from fio/tools/hist/fiologparser_hist.py.
@@ -52,6 +49,12 @@
return lower + (upper - lower) * edge
-def get_lat_vals(columns: int = expected_lat_bins, coarseness: int = 0) -> List[float]:
- return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+def get_lat_vals(columns: int, coarseness: int = 0) -> List[float]:
+ # convert ns to ms
+ if columns == 1216:
+ coef = 1
+ elif columns == 1856:
+ coef = 1000
+
+ return [plat_idx_to_val_coarse(val, coarseness) / coef for val in range(columns)]
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 6676895..9dffb49 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,13 +1,10 @@
import copy
from collections import OrderedDict
-from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Tuple, Any, cast
from cephlib.units import ssize2b, b2ssize
-from ..job import JobConfig, JobParams
-
-
-Var = NamedTuple('Var', [('name', str)])
+from ..job import JobConfig, JobParams, Var
def is_fio_opt_true(vl: Union[str, int]) -> bool:
@@ -40,7 +37,7 @@
@property
def summary(self) -> str:
"""Test short summary, used mostly for file names and short image description"""
- res = "{0[oper_short]}{0[sync_mode]}{0[bsize]}".format(self)
+ res = f"{self['oper_short']}{self['sync_mode']}{self['bsize']}"
if self['qd'] is not None:
res += "_qd" + str(self['qd'])
if self['thcount'] not in (1, None):
@@ -52,13 +49,13 @@
@property
def long_summary(self) -> str:
"""Readable long summary for management and deployment engineers"""
- res = "{0[oper]}, {0.sync_mode_long}, block size {1}B".format(self, b2ssize(self['bsize'] * 1024))
+ res = f"{self['oper']}, {self.sync_mode_long}, block size {b2ssize(self['bsize'] * 1024)}B"
if self['qd'] is not None:
res += ", QD = " + str(self['qd'])
if self['thcount'] not in (1, None):
- res += ", threads={0[thcount]}".format(self)
+ res += f", threads={self['thcount']}"
if self['write_perc'] is not None:
- res += ", write_perc={0[write_perc]}%".format(self)
+ res += f", fwrite_perc={self['write_perc']}%"
return res
def copy(self, **kwargs: Dict[str, Any]) -> 'FioJobParams':
@@ -89,24 +86,24 @@
def __init__(self, name: str, idx: int) -> None:
JobConfig.__init__(self, idx)
self.name = name
- self._sync_mode = None # type: Optional[str]
- self._params = None # type: Optional[Dict[str, Any]]
+ self._sync_mode: Optional[str] = None
+ self._params: Optional[Dict[str, Any]] = None
# ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
@property
def write_perc(self) -> Optional[int]:
try:
- return int(self.vals["rwmixwrite"])
+ return int(self.vals["rwmixwrite"]) # type: ignore
except (KeyError, TypeError):
try:
- return 100 - int(self.vals["rwmixread"])
+ return 100 - int(self.vals["rwmixread"]) # type: ignore
except (KeyError, TypeError):
return None
@property
def qd(self) -> int:
- return int(self.vals.get('iodepth', '1'))
+ return int(self.vals.get('iodepth', '1')) # type: ignore
@property
def bsize(self) -> int:
@@ -117,7 +114,7 @@
@property
def oper(self) -> str:
vl = self.vals['rw']
- return vl if ':' not in vl else vl.split(":")[0]
+ return vl if ':' not in vl else vl.split(":")[0] # type: ignore
@property
def op_type_short(self) -> str:
@@ -125,14 +122,14 @@
@property
def thcount(self) -> int:
- return int(self.vals.get('numjobs', 1))
+ return int(self.vals.get('numjobs', 1)) # type: ignore
@property
def sync_mode(self) -> str:
if self._sync_mode is None:
- direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
- not is_fio_opt_true(self.vals.get('buffered', '0'))
- sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) # type: ignore
+ direct = direct or not is_fio_opt_true(self.vals.get('buffered', '0')) # type: ignore
+ sync = is_fio_opt_true(self.vals.get('sync', '0')) # type: ignore
self._sync_mode = self.ds2mode[(sync, direct)]
return cast(str, self._sync_mode)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 5b91885..a9e13dc 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,9 +1,8 @@
#!/usr/bin/env python3
import re
-import os
import sys
-import os.path
+import pathlib
import argparse
import itertools
from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
@@ -104,34 +103,30 @@
def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobConfig]:
in_globals = False
curr_section = None
- glob_vals = OrderedDict() # type: Dict[str, Any]
+ glob_vals: Dict[str, Any] = OrderedDict()
sections_count = 0
- lexed_lines = list(lexer_iter) # type: List[CfgLine]
+ lexed_lines: List[CfgLine] = list(lexer_iter)
one_more = True
- includes = {}
+ includes: Dict[str, Tuple[str, int]] = {}
while one_more:
- new_lines = [] # type: List[CfgLine]
+ new_lines: List[CfgLine] = []
one_more = False
for line in lexed_lines:
fname, lineno, oline, tp, name, val = line
if INCLUDE == tp:
- if not os.path.exists(fname):
- dirname = '.'
- else:
- dirname = os.path.dirname(fname)
-
- new_fname = os.path.join(dirname, name)
- includes[new_fname] = (fname, lineno)
+ fobj = pathlib.Path(fname)
+ new_fname = (fobj.parent / name) if fobj.exists() else pathlib.Path(name)
+ includes[str(new_fname)] = (fname, lineno)
try:
- cont = open(new_fname).read()
+ cont = new_fname.open().read()
except IOError as err:
- raise ParseError("Error while including file {}: {}".format(new_fname, err), fname, lineno, oline)
+ raise ParseError(f"Error while including file {new_fname}: {err}", fname, lineno, oline)
- new_lines.extend(fio_config_lexer(cont, new_fname))
+ new_lines.extend(fio_config_lexer(cont, str(new_fname)))
one_more = True
else:
new_lines.append(line)
@@ -161,7 +156,7 @@
if in_globals:
glob_vals[name] = val
elif name == name.upper():
- raise ParseError("Param {!r} not in [global] section".format(name), fname, lineno, oline)
+ raise ParseError(f"Param {name!r} not in [global] section", fname, lineno, oline)
elif curr_section is None:
raise ParseError("Data outside section", fname, lineno, oline)
else:
@@ -172,7 +167,7 @@
def process_cycles(sec: FioJobConfig) -> Iterator[FioJobConfig]:
- cycles = OrderedDict() # type: Dict[str, Any]
+ cycles: Dict[str, Any] = OrderedDict()
for name, val in sec.vals.items():
if isinstance(val, list) and name.upper() != name:
@@ -203,12 +198,12 @@
yield new_sec
-FioParamsVal = Union[str, Var]
+FioParamsVal = Union[str, Var, int]
FioParams = Dict[str, FioParamsVal]
def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
- processed_vals = OrderedDict() # type: Dict[str, Any]
+ processed_vals: Dict[str, Any] = OrderedDict()
processed_vals.update(params)
for name, val in sec.vals.items():
@@ -251,8 +246,7 @@
sec.vals['unified_rw_reporting'] = '1'
if isinstance(sec.vals['size'], Var):
- raise ValueError("Variable {0} isn't provided".format(
- sec.vals['size'].name))
+ raise ValueError(f"Variable {sec.vals['size'].name} isn't provided")
sz = ssize2b(sec.vals['size'])
offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
@@ -266,7 +260,7 @@
for vl in sec.vals.values():
if isinstance(vl, Var):
- raise ValueError("Variable {0} isn't provided".format(vl.name))
+ raise ValueError(f"Variable {vl.name} isn't provided")
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
@@ -282,13 +276,11 @@
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobConfig]:
+def parse_all_in_1(source:str, fname: str) -> Iterator[FioJobConfig]:
return fio_config_parse(fio_config_lexer(source, fname))
def get_log_files(sec: FioJobConfig, iops: bool = False) -> Iterator[Tuple[str, str, str]]:
- res = [] # type: List[Tuple[str, str, str]]
-
keys = [('write_bw_log', 'bw', 'KiBps'),
('write_hist_log', 'lat', 'us')]
if iops:
@@ -304,8 +296,8 @@
test_params = test_params.copy()
if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
- ramp = int(int(test_params['RUNTIME']) * 0.05)
- test_params['RAMPTIME'] = min(30, max(5, ramp))
+ ramp = int(int(test_params['RUNTIME']) * 0.05) # type: ignore
+ test_params['RAMPTIME'] = min(30, max(5, ramp)) # type: ignore
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
index 2b0fc74..e7ca82d 100644
--- a/wally/suits/io/rrd_raw.cfg
+++ b/wally/suits/io/rrd_raw.cfg
@@ -1,6 +1,6 @@
[test]
-blocksize=4k
-rw=randread
+blocksize=4m
+rw=write
iodepth=1
ramp_time=0
runtime=120
@@ -17,5 +17,8 @@
wait_for_previous=1
per_job_logs=0
randrepeat=0
-filename=/dev/sdb
-size=100G
\ No newline at end of file
+filename=/media/data/test.db
+size=50G
+;verify_pattern=0x00
+buffer_compress_percentage=99
+write_bw_log=/tmp/bw.non-compress.log
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 26fd252..111e6bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -26,11 +26,12 @@
retry_time = 30
job_config_cls = None # type: type
- def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+ def __init__(self, storage: IWallyStorage, suite: SuiteConfig,
+ on_tests_boundry: Callable[[bool], None] = None) -> None:
self.suite = suite
self.stop_requested = False
self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
- self.on_idle = on_idle
+ self.on_tests_boundry = on_tests_boundry
self.storage = storage
def request_stop(self) -> None:
@@ -55,11 +56,11 @@
# used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
max_time_diff = 5
max_rel_time_diff = 0.05
- load_profile_name = None # type: str
+ load_profile_name: str = None # type: ignore
def __init__(self, *args, **kwargs) -> None:
PerfTest.__init__(self, *args, **kwargs)
- self.job_configs = None # type: List[JobConfig]
+ self.job_configs: List[JobConfig] = None # type: ignore
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: JobConfig) -> Optional[int]:
@@ -107,13 +108,12 @@
if None not in run_times:
# +10s - is a rough estimation for additional operations per iteration
- expected_run_time = int(sum(run_times) + 10 * len(not_in_storage))
-
+ expected_run_time: int = int(sum(run_times) + 10 * len(not_in_storage)) # type: ignore
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
for job in not_in_storage:
- results = [] # type: List[TimeSeries]
+ results: List[TimeSeries] = []
for retry_idx in range(self.max_retry):
logger.info("Preparing job %s", job.params.summary)
@@ -121,8 +121,14 @@
wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
expected_job_time = self.get_expected_runtime(job)
- exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
- logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+ if expected_job_time is None:
+ logger.info("Job execution time is unknown")
+ else:
+ exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
+ logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+
+ if self.on_tests_boundry is not None:
+ self.on_tests_boundry(True)
jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
failed = False
@@ -132,6 +138,9 @@
except EnvironmentError:
failed = True
+ if self.on_tests_boundry is not None:
+ self.on_tests_boundry(False)
+
if not failed:
break
@@ -145,8 +154,8 @@
results = []
# per node jobs start and stop times
- start_times = [] # type: List[int]
- stop_times = [] # type: List[int]
+ start_times: List[int] = []
+ stop_times: List[int] = []
for ts in results:
self.storage.put_ts(ts)
@@ -180,8 +189,6 @@
self.storage.put_job(self.suite, job)
self.storage.sync()
- if self.on_idle is not None:
- self.on_idle()
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 8ef1093..d336807 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -1,10 +1,13 @@
import abc
-from typing import Dict, Any, Tuple, cast, Union
+from typing import Dict, Any, Tuple, cast, Union, NamedTuple
from collections import OrderedDict
from cephlib.istorage import Storable
+Var = NamedTuple('Var', [('name', str)])
+
+
class JobParams(metaclass=abc.ABCMeta):
"""Class contains all job parameters, which significantly affects job results.
Like block size or operation type, but not file name or file size.
@@ -41,12 +44,12 @@
def __eq__(self, o: object) -> bool:
if not isinstance(o, self.__class__):
- raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+ raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
return sorted(self.params.items()) == sorted(cast(JobParams, o).params.items())
def __lt__(self, o: object) -> bool:
if not isinstance(o, self.__class__):
- raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+ raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
return self.char_tpl < cast(JobParams, o).char_tpl
@property
@@ -63,11 +66,10 @@
self.idx = idx
# time interval, in seconds, when test was running on all nodes
- self.reliable_info_range = None # type: Tuple[int, int]
-
+ self.reliable_info_range: Tuple[int, int] = None # type: ignore
# all job parameters, both from suite file and config file
- self.vals = OrderedDict() # type: Dict[str, Any]
+ self.vals: Dict[str, Any] = OrderedDict()
@property
def reliable_info_range_s(self) -> Tuple[int, int]:
@@ -76,7 +78,7 @@
@property
def storage_id(self) -> str:
"""unique string, used as key in storage"""
- return "{}_{}".format(self.summary, self.idx)
+ return f"{self.summary}_{self.idx}"
@property
@abc.abstractmethod
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 7aed795..05585e1 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,4 +1,3 @@
-import collections
from typing import List, Callable, Any, Dict, Optional, Set
from concurrent.futures import ThreadPoolExecutor
@@ -9,7 +8,6 @@
from .openstack_api import OSCreds, OSConnection
from .config import Config
-from .fuel_rest_api import Connection
from .result_classes import IWallyStorage
@@ -17,29 +15,29 @@
"""Test run information"""
def __init__(self, config: Config, storage: IStorage, rstorage: IWallyStorage) -> None:
# NodesInfo list
- self.nodes_info = {} # type: Dict[str, NodeInfo]
+ self.nodes_info: Dict[str, NodeInfo] = {}
+
+ self.ceph_master_node: Optional[IRPCNode] = None
+ self.ceph_extra_args: Optional[str] = None
# Nodes list
- self.nodes = [] # type: List[IRPCNode]
+ self.nodes: List[IRPCNode] = []
- self.build_meta = {} # type: Dict[str,Any]
- self.clear_calls_stack = [] # type: List[Callable[['TestRun'], None]]
+ self.build_meta: Dict[str,Any] = {}
+ self.clear_calls_stack: List[Callable[['TestRun'], None]] = []
# openstack credentials
- self.fuel_openstack_creds = None # type: Optional[OSCreds]
- self.fuel_version = None # type: Optional[List[int]]
- self.os_creds = None # type: Optional[OSCreds]
- self.os_connection = None # type: Optional[OSConnection]
- self.fuel_conn = None # type: Optional[Connection]
- self.rpc_code = None # type: bytes
- self.default_rpc_plugins = None # type: Dict[str, bytes]
+ self.os_creds: Optional[OSCreds] = None # type: ignore
+ self.os_connection: Optional[OSConnection] = None # type: ignore
+ self.rpc_code: bytes = None # type: ignore
+ self.default_rpc_plugins: Dict[str, bytes] = None # type: ignore
self.storage = storage
self.rstorage = rstorage
self.config = config
- self.sensors_run_on = set() # type: Set[str]
- self.os_spawned_nodes_ids = None # type: List[int]
- self.devs_locator = [] # type: DevRolesConfig
+ self.sensors_run_on: Set[str] = set()
+ self.os_spawned_nodes_ids: List[int] = None # type: ignore
+ self.devs_locator: DevRolesConfig = []
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))