typing and refactoring on the way
diff --git a/.gitignore b/.gitignore
index fc3b04d..3a71c42 100644
--- a/.gitignore
+++ b/.gitignore
@@ -53,3 +53,5 @@
# PyBuilder
target/
.idea//sensor_report.txt
+
+.env/
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..b14cd96
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,8 @@
+.PHONY: mypy
+
+
+ALL_FILES=$(shell find wally/ -type f -name '*.py')
+STUBS="stubs:.env/lib/python3.5/site-packages"
+
+mypy:
+ bash -c "cd ~/workspace/wally; source .env/bin/activate ; MYPYPATH=${STUBS} python3 -m mypy -s ${ALL_FILES}"
diff --git a/configs-examples/full.yaml b/configs-examples/full.yaml
new file mode 100644
index 0000000..85382c8
--- /dev/null
+++ b/configs-examples/full.yaml
@@ -0,0 +1,69 @@
+collect_info: false
+suspend_unused_vms: false
+results_storage: /var/wally_results
+var_dir_root: /tmp/perf_tests
+discover: fuel_openrc_only
+collect_info: false
+suspend_unused_vms: true
+
+logging:
+ extra_logs: 1
+ level: DEBUG
+
+vm_configs:
+ keypair_file_private: wally_vm_key_perf3.pem
+ keypair_file_public: wally_vm_key_perf3.pub
+ keypair_name: wally_vm_key
+
+ wally_1024:
+ image:
+ name: wally_ubuntu
+ url: https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+ creds: "ssh://ubuntu@{ip}::{private_key_path}"
+
+ flavor:
+ name: wally_1024
+ hdd_size: 100
+ ram_size: 1024
+ cpu_count: 2
+
+ vol_sz: 100
+ name_templ: wally-{group}-{id}
+ aa_group_name: wally-aa-{0}
+ security_group: wally_ssh_to_everyone
+
+clouds:
+ fuel:
+ url: http://172.16.44.13:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:r00tme
+ openstack_env: test
+
+ openstack:
+ OPENRC: /home/koder/workspace/scale_openrc
+ vms:
+ - "wally-phytographic-sharla,ssh://ubuntu@{ip}::wally_vm_key.pem"
+
+sensors:
+ online: true
+ roles_mapping:
+ testnode: system-cpu, block-io, net-io
+ ceph-osd: system-cpu, block-io, net-io
+ compute: system-cpu, block-io, net-io
+
+tests:
+ - start_test_nodes:
+ openstack:
+ count: =2
+ cfg_name: wally_1024
+ network_zone_name: net04
+ flt_ip_pool: net04_ext
+ skip_preparation: true
+
+ tests:
+ - io:
+ node_limit: 2
+ cfg: ceph
+ params:
+ FILENAME: /dev/vdb
+ TEST_FILE_SIZE: 100G
diff --git a/requirements.txt b/requirements.txt
index b663660..62fa724 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,3 +11,4 @@
texttable
pycrypto
ecdsa
+psutil
diff --git a/stubs/paramiko.pyi b/stubs/paramiko.pyi
new file mode 100644
index 0000000..45010f0
--- /dev/null
+++ b/stubs/paramiko.pyi
@@ -0,0 +1,34 @@
+from io import BytesIO
+from typing import Any, Tuple
+
+
+__version_info__ = None # type: Tuple[int, int, int]
+
+
+class PasswordRequiredException(Exception):
+ pass
+
+
+class SSHException(Exception):
+ pass
+
+
+class RSAKey:
+ @classmethod
+ def from_private_key(cls, data: BytesIO) -> 'RSAKey': ...
+
+
+
+class AutoAddPolicy:
+ pass
+
+
+class SSHClient:
+ def __init__(self) -> None:
+ self.known_hosts = None # type: Any
+
+ def load_host_keys(self, path: str) -> None: ...
+ def set_missing_host_key_policy(self, policy: AutoAddPolicy) -> None: ...
+ def connect(self, *args: Any, **kwargs: Any): ...
+ def get_transport(self) -> Any: ...
+ def open_sftp(self) -> Any: ...
diff --git a/stubs/psutil.pyi b/stubs/psutil.pyi
new file mode 100644
index 0000000..d3f1388
--- /dev/null
+++ b/stubs/psutil.pyi
@@ -0,0 +1,6 @@
+from typing import Iterable
+
+class Process:
+ def __init__(self, pid: int) -> None: ...
+ def children(self, recursive: bool = True) -> Iterable['Process']: ...
+ def kill(self) -> None: ...
diff --git a/stubs/yaml.pyi b/stubs/yaml.pyi
new file mode 100644
index 0000000..d8d6516
--- /dev/null
+++ b/stubs/yaml.pyi
@@ -0,0 +1,9 @@
+from typing import Union, List, Dict, Any, IO
+
+
+Basic = Union[List, Dict[str, Any]]
+
+
+def load(stream: IO, loader: Any) -> Any: ...
+class CLoader: ...
+
diff --git a/v2_plans.md b/v2_plans.md
index 52a0c1d..be719b6 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,30 +1,41 @@
+TODO:
+
+ * revise type checking
+ * use overloading module
+
Steps:
Discover/reuse - returns NodeInfo
Connect - returns Node from NodeInfo
Node contains ssh, rpc interface and basic API
Add aio rpc client
+ * Make storage class with dict-like interface
+ - map path to value, e.g. 'cluster_info': yaml
+ - 'results/r20w80b60kQD10VM2/iops': [iops]
+ - should support both binary and text(yaml) formats, maybe store in both
+ - store all results in it
+ - before call a stage/part check that it results is not awailable yet,
+ or chek this inside stage. Skip stage if data already awailable
+ - use for discovery, tests, etc
+ * aio?
+ * Update to newest fio
+ * Add fio build script to download fio from git and build it
+ * Add integration tests with nbd
+ * Move from threads to QD to mitigate fio issues
+ * Use agent to communicate with remote node
+ * fix existing folder detection
+ * fio load reporters
+ * Results stored in archived binary format for fast parsing (sqlite)?
+ * Split all code on separated modules:
+ * logging
+ * Test run class
+ * Test stage class
+ * Results are set of timeseries with attached descriptions
-
-* aio?
-* Update to newest fio
-* Add fio build/test code
-* Add integration tests with nbd
-* Move from threads to QD to mitigate fio issues
-* Use agent to communicate with remote node
-* fix existing folder detection
-* fio load reporters
-* Results stored in archived binary format for fast parsing (sqlite)?
-* Split all code on separated modules:
- * logging
- * Test run class
- * Test stage class
-* Results are set of timeseries with attached descriptions
-
-* move agent and ssh code to separated library
-* plugins for agent
-* evaluate bokeh for visualization
-https://github.com/cronburg/ceph-viz/tree/master/histogram
+ * move agent and ssh code to separated library
+ * plugins for agent
+ * evaluate bokeh for visualization
+ https://github.com/cronburg/ceph-viz/tree/master/histogram
* Statistical result check and report:
- check results distribution
diff --git a/wally/config.py b/wally/config.py
index c5d1db0..59db0e1 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,141 +1,60 @@
-import os
-import uuid
-import functools
-
-import yaml
-
-try:
- from petname import Generate as pet_generate
-except ImportError:
- def pet_generate(x, y):
- return str(uuid.uuid4())
-
-from . import pretty_yaml
-
+from typing import Any, Dict
+from .storage import IStorable, IStorage
class NoData:
@classmethod
- def get(cls, name, x):
+ def get(cls: type, name: str, x: Any) -> type:
return cls
-class Config:
- def __init__(self, val=None):
- if val is not None:
- self.update(val)
- self.results_dir = None
- self.run_uuid = None
- self.settings = {}
- self.run_params_file = None
- self.default_test_local_folder = None
- self.hwinfo_directory = None
- self.hwreport_fname = None
+class Config(IStorable):
+ # for mypy only
+ run_uuid = None # type: str
+ storage_url = None # type: str
+ comment = None # type: str
+ keep_vm = None # type: bool
+ no_tests = None # type: bool
+ dont_discover_nodes = None # type: bool
+ build_id = None # type: str
+ build_description = None # type: str
+ build_type = None # type: str
- def get(self, name, defval=None):
- obj = self.__dict__
- for cname in name.split("."):
- obj = obj.get(cname, NoData)
+ def __init__(self, dct: Dict[str, Any]) -> None:
+ self.__dict__['_dct'] = dct
- if obj is NoData:
- return defval
- return obj
+ def get(self, path: str, default: Any = NoData) -> Any:
+ curr = self
- def update(self, val):
- self.__dict__.update(val)
+ while path:
+ if '/' in path:
+ name, path = path.split('/', 1)
+ else:
+ name = path
+ path = ""
+
+ try:
+ curr = getattr(curr, name)
+ except AttributeError:
+ return default
+
+ return curr
+
+ def __getattr__(self, name: str) -> Any:
+ try:
+ val = self.__dct[name]
+ except KeyError:
+ raise AttributeError(name)
+
+ if isinstance(val, dict):
+ val = self.__class__(val)
+
+ return val
+
+ def __setattr__(self, name: str, val: Any):
+ self.__dct[name] = val
-def get_test_files(results_dir):
- in_var_dir = functools.partial(os.path.join, results_dir)
-
- res = dict(
- run_params_file='run_params.yaml',
- saved_config_file='config.yaml',
- vm_ids_fname='os_vm_ids',
- html_report_file='{0}_report.html',
- load_report_file='load_report.html',
- text_report_file='report.txt',
- log_file='log.txt',
- sensor_storage='sensor_storage',
- nodes_report_file='nodes.yaml',
- results_storage='results',
- hwinfo_directory='hwinfo',
- hwreport_fname='hwinfo.txt',
- raw_results='raw_results.yaml')
-
- res = dict((k, in_var_dir(v)) for k, v in res.items())
- res['results_dir'] = results_dir
- return res
-
-
-def load_config(file_name):
- file_name = os.path.abspath(file_name)
-
- defaults = dict(
- testnode_log_root='/tmp/wally',
- settings={}
- )
-
- raw_cfg = yaml.load(open(file_name).read())
- raw_cfg['config_folder'] = os.path.dirname(file_name)
- if 'include' in raw_cfg:
- default_path = os.path.join(raw_cfg['config_folder'],
- raw_cfg.pop('include'))
- default_cfg = yaml.load(open(default_path).read())
-
- # TODO: Need more intelectual configs merge?
- default_cfg.update(raw_cfg)
- raw_cfg = default_cfg
-
- cfg = Config(defaults)
- cfg.update(raw_cfg)
-
- results_storage = cfg.settings.get('results_storage', '/tmp')
- results_storage = os.path.abspath(results_storage)
-
- existing = file_name.startswith(results_storage)
-
- if existing:
- cfg.results_dir = os.path.dirname(file_name)
- cfg.run_uuid = os.path.basename(cfg.results_dir)
- else:
- # genarate result folder name
- for i in range(10):
- cfg.run_uuid = pet_generate(2, "_")
- cfg.results_dir = os.path.join(results_storage,
- cfg.run_uuid)
- if not os.path.exists(cfg.results_dir):
- break
- else:
- cfg.run_uuid = str(uuid.uuid4())
- cfg.results_dir = os.path.join(results_storage,
- cfg.run_uuid)
-
- # setup all files paths
- cfg.update(get_test_files(cfg.results_dir))
-
- if existing:
- cfg.update(load_run_params(cfg.run_params_file))
-
- testnode_log_root = cfg.get('testnode_log_root')
- testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
- cfg.default_test_local_folder = testnode_log_dir.format(cfg.run_uuid)
-
- return cfg
-
-
-def save_run_params(cfg):
- params = {
- 'comment': cfg.comment,
- 'run_uuid': cfg.run_uuid
- }
-
- with open(cfg.run_params_file, 'w') as fd:
- fd.write(pretty_yaml.dumps(params))
-
-
-def load_run_params(run_params_file):
- with open(run_params_file) as fd:
- dt = yaml.load(fd)
-
- return dict(run_uuid=dt['run_uuid'],
- comment=dt.get('comment'))
+class Context:
+ def __init__(self, config: Config, storage: IStorage):
+ self.config = config
+ self.storage = storage
\ No newline at end of file
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
index 3ac983e..e69de29 100644
--- a/wally/discover/__init__.py
+++ b/wally/discover/__init__.py
@@ -1,5 +0,0 @@
-"this package contains node discovery code"
-from .node import Node
-from .discover import discover
-
-__all__ = ["discover", "Node"]
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 8799ed2..03beb37 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -4,7 +4,7 @@
import logging
import urllib.request
import urllib.parse
-from typing import Dict, Any
+from typing import Dict, Any, Iterator, Match
from functools import partial, wraps
import netaddr
@@ -36,7 +36,7 @@
def host(self) -> str:
return self.root_url.split('/')[2]
- def do(self, method: str, path: str, params: Dict[Any, Any]=None):
+ def do(self, method: str, path: str, params: Dict[Any, Any]=None) -> Dict[str, Any]:
if path.startswith('/'):
url = self.root_url + path
else:
@@ -78,7 +78,7 @@
class KeystoneAuth(Urllib2HTTP):
- def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None):
+ def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str] = None) -> None:
super(KeystoneAuth, self).__init__(root_url, headers)
admin_node_ip = urllib.parse.urlparse(root_url).hostname
self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
@@ -86,7 +86,7 @@
auth_url=self.keystone_url, **creds)
self.refresh_token()
- def refresh_token(self):
+ def refresh_token(self) -> None:
"""Get new token from keystone and update headers"""
try:
self.keystone.authenticate()
@@ -96,7 +96,7 @@
'Cant establish connection to keystone with url %s',
self.keystone_url)
- def do(self, method: str, path: str, params: Dict[str, str]=None):
+ def do(self, method: str, path: str, params: Dict[str, str] = None) -> Dict[str, Any]:
"""Do request. If gets 401 refresh token"""
try:
return super(KeystoneAuth, self).do(method, path, params)
@@ -110,7 +110,7 @@
raise
-def get_inline_param_list(url: str):
+def get_inline_param_list(url: str) -> Iterator[Match]:
format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
for match in format_param_rr.finditer(url):
yield match.group(1)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 214883d..4f21314 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,9 +1,10 @@
import re
-from typing import Dict, Any, Iterable
+from typing import Dict, Iterable
import xml.etree.ElementTree as ET
+from typing import List, Tuple
from . import utils
-from .inode import INode
+from .interfaces import IRemoteNode
def get_data(rr: str, data: str) -> str:
@@ -12,31 +13,31 @@
class HWInfo:
- def __init__(self):
- self.hostname = None
- self.cores = []
+ def __init__(self) -> None:
+ self.hostname = None # type: str
+ self.cores = [] # type: List[Tuple[str, int]]
# /dev/... devices
- self.disks_info = {}
+ self.disks_info = {} # type: Dict[str, Tuple[str, int]]
# real disks on raid controller
- self.disks_raw_info = {}
+ self.disks_raw_info = {} # type: Dict[str, str]
# name => (speed, is_full_diplex, ip_addresses)
- self.net_info = {}
+ self.net_info = {} # type: Dict[str, Tuple[int, bool, str]]
- self.ram_size = 0
- self.sys_name = None
- self.mb = None
- self.raw = None
+ self.ram_size = 0 # type: int
+ self.sys_name = None # type: str
+ self.mb = None # type: str
+ self.raw = None # type: str
- self.storage_controllers = []
+ self.storage_controllers = [] # type: List[str]
def get_hdd_count(self) -> Iterable[int]:
# SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
return []
- def get_summary(self) -> Dict[str, Any]:
+ def get_summary(self) -> Dict[str, int]:
cores = sum(count for _, count in self.cores)
disks = sum(size for _, size in self.disks_info.values())
@@ -103,21 +104,21 @@
class SWInfo:
- def __init__(self):
- self.partitions = None
- self.kernel_version = None
- self.fio_version = None
- self.libvirt_version = None
- self.kvm_version = None
- self.qemu_version = None
- self.OS_version = None
- self.ceph_version = None
+ def __init__(self) -> None:
+ self.partitions = None # type: str
+ self.kernel_version = None # type: str
+ self.fio_version = None # type: str
+ self.libvirt_version = None # type: str
+ self.kvm_version = None # type: str
+ self.qemu_version = None # type: str
+ self.OS_version = None # type: str
+ self.ceph_version = None # type: str
-def get_sw_info(node: INode) -> SWInfo:
+def get_sw_info(node: IRemoteNode) -> SWInfo:
res = SWInfo()
- res.OS_version = utils.get_os()
+ res.OS_version = utils.get_os(node)
res.kernel_version = node.get_file_content('/proc/version')
res.partitions = node.get_file_content('/etc/mtab')
res.libvirt_version = node.run("virsh -v", nolog=True)
@@ -127,7 +128,7 @@
return res
-def get_hw_info(node: INode) -> HWInfo:
+def get_hw_info(node: IRemoteNode) -> HWInfo:
res = HWInfo()
lshw_out = node.run('sudo lshw -xml 2>/dev/null', nolog=True)
diff --git a/wally/inode.py b/wally/inode.py
index 225a807..7851a4a 100644
--- a/wally/inode.py
+++ b/wally/inode.py
@@ -1,7 +1,9 @@
import abc
-from typing import Set, Dict, Tuple, Any, Optional
+from typing import Set, Dict, Optional
from .ssh_utils import parse_ssh_uri
+from . import hw_info
+from .interfaces import IRemoteNode, IHost
class FuelNodeInfo:
@@ -9,11 +11,11 @@
def __init__(self,
version: str,
fuel_ext_iface: str,
- openrc: Dict[str, str]):
+ openrc: Dict[str, str]) -> None:
- self.version = version
- self.fuel_ext_iface = fuel_ext_iface
- self.openrc = openrc
+ self.version = version # type: str
+ self.fuel_ext_iface = fuel_ext_iface # type: str
+ self.openrc = openrc # type: Dict[str, str]
class NodeInfo:
@@ -21,26 +23,31 @@
def __init__(self,
ssh_conn_url: str,
roles: Set[str],
- bind_ip: str=None,
- ssh_key: str=None):
- self.ssh_conn_url = ssh_conn_url
- self.roles = roles
+ bind_ip: str = None,
+ ssh_key: str = None) -> None:
+ self.ssh_conn_url = ssh_conn_url # type: str
+ self.roles = roles # type: Set[str]
if bind_ip is None:
bind_ip = parse_ssh_uri(self.ssh_conn_url).host
- self.bind_ip = bind_ip
- self.ssh_key = ssh_key
+ self.bind_ip = bind_ip # type: str
+ self.ssh_key = ssh_key # type: Optional[str]
-class INode(metaclass=abc.ABCMeta):
+class INode(IRemoteNode, metaclass=abc.ABCMeta):
"""Node object"""
def __init__(self, node_info: NodeInfo):
- self.rpc = None
- self.node_info = node_info
- self.hwinfo = None
- self.roles = []
+ IRemoteNode.__init__(self)
+ self.node_info = node_info # type: NodeInfo
+ self.hwinfo = None # type: hw_info.HWInfo
+ self.swinfo = None # type: hw_info.SWInfo
+ self.os_vm_id = None # type: str
+ self.ssh_conn = None # type: IHost
+ self.ssh_conn_url = None # type: str
+ self.rpc_conn = None
+ self.rpc_conn_url = None # type: str
@abc.abstractmethod
def __str__(self):
@@ -50,57 +57,5 @@
return str(self)
@abc.abstractmethod
- def is_connected(self) -> bool:
- pass
-
- @abc.abstractmethod
- def connect_ssh(self, timeout: int=None) -> None:
- pass
-
- @abc.abstractmethod
- def connect_rpc(self) -> None:
- pass
-
- @abc.abstractmethod
- def prepare_rpc(self) -> None:
- pass
-
- @abc.abstractmethod
- def get_ip(self) -> str:
- pass
-
- @abc.abstractmethod
- def get_user(self) -> str:
- pass
-
- @abc.abstractmethod
- def run(self, cmd: str, stdin_data: str=None, timeout: int=60, nolog: bool=False) -> str:
- pass
-
- @abc.abstractmethod
- def discover_hardware_info(self) -> None:
- pass
-
- @abc.abstractmethod
- def copy_file(self, local_path: str, remote_path: Optional[str]=None) -> str:
- pass
-
- @abc.abstractmethod
- def get_file_content(self, path: str) -> bytes:
- pass
-
- @abc.abstractmethod
- def put_to_file(self, path:str, content: bytes) -> None:
- pass
-
- @abc.abstractmethod
- def forward_port(self, ip: str, remote_port: int, local_port: int=None) -> int:
- pass
-
- @abc.abstractmethod
- def get_interface(self, ip: str) -> str:
- pass
-
- @abc.abstractmethod
- def stat_file(self, path:str) -> Any:
+ def node_id(self) -> str:
pass
diff --git a/wally/interfaces.py b/wally/interfaces.py
new file mode 100644
index 0000000..d87c753
--- /dev/null
+++ b/wally/interfaces.py
@@ -0,0 +1,81 @@
+import abc
+from typing import Any, Set, Dict
+
+
+class IRemoteShell(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+ pass
+
+
+class IHost(IRemoteShell, metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def get_ip(self) -> str:
+ pass
+
+ @abc.abstractmethod
+ def __str__(self) -> str:
+ pass
+
+ @abc.abstractmethod
+ def put_to_file(self, path: str, content: bytes) -> None:
+ pass
+
+
+class IRemoteFS(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def copy_file(self, local_path: str, remote_path: str = None) -> str:
+ pass
+
+ @abc.abstractmethod
+ def get_file_content(self, path: str) -> bytes:
+ pass
+
+ @abc.abstractmethod
+ def put_to_file(self, path:str, content: bytes) -> None:
+ pass
+
+ @abc.abstractmethod
+ def forward_port(self, ip: str, remote_port: int, local_port: int = None) -> int:
+ pass
+
+ @abc.abstractmethod
+ def get_interface(self, ip: str) -> str:
+ pass
+
+ @abc.abstractmethod
+ def stat_file(self, path:str) -> Any:
+ pass
+
+
+class IRPC(metaclass=abc.ABCMeta):
+ pass
+
+
+class IRemoteNode(IRemoteFS, IRemoteShell, metaclass=abc.ABCMeta):
+
+ def __init__(self) -> None:
+ self.roles = set() # type: Set[str]
+ self.rpc = None # type: IRPC
+ self.rpc_params = None # type: Dict[str, Any]
+
+ @abc.abstractmethod
+ def is_connected(self) -> bool:
+ pass
+
+ @abc.abstractmethod
+ def disconnect(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def connect_ssh(self, timeout: int = None) -> None:
+ pass
+
+ @abc.abstractmethod
+ def get_ip(self) -> str:
+ pass
+
+ @abc.abstractmethod
+ def get_user(self) -> str:
+ pass
+
diff --git a/wally/keystone.py b/wally/keystone.py
index 77c1d5b..358d128 100644
--- a/wally/keystone.py
+++ b/wally/keystone.py
@@ -14,9 +14,8 @@
allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
- def __init__(self, root_url:str, headers:Dict[str, str]=None, echo: bool=False):
- """
- """
+ def __init__(self, root_url:str, headers:Dict[str, str]=None, echo: bool=False) -> None:
+ """"""
if root_url.endswith('/'):
self.root_url = root_url[:-1]
else:
diff --git a/wally/logger.py b/wally/logger.py
index 43eff6b..a8cbf2a 100644
--- a/wally/logger.py
+++ b/wally/logger.py
@@ -1,7 +1,8 @@
import logging
+from typing import Callable, IO
-def color_me(color):
+def color_me(color: int) -> Callable[[str], str]:
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
@@ -22,11 +23,11 @@
'ERROR': color_me(RED)
}
- def __init__(self, msg, use_color=True, datefmt=None):
+ def __init__(self, msg: str, use_color: bool=True, datefmt: str=None) -> None:
logging.Formatter.__init__(self, msg, datefmt=datefmt)
self.use_color = use_color
- def format(self, record):
+ def format(self, record: logging.LogRecord) -> str:
orig = record.__dict__
record.__dict__ = record.__dict__.copy()
levelname = record.levelname
@@ -47,7 +48,7 @@
return res
-def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+def setup_loggers(def_level: int = logging.DEBUG, log_fname: str = None, log_fd: IO = None) -> None:
logger = logging.getLogger('wally')
logger.setLevel(logging.DEBUG)
sh = logging.StreamHandler()
@@ -61,14 +62,18 @@
logger_api = logging.getLogger("wally.fuel_api")
- if log_fname is not None:
- fh = logging.FileHandler(log_fname)
+ if log_fname or log_fd:
+ if log_fname:
+ handler = logging.FileHandler(log_fname)
+ else:
+ handler = logging.StreamHandler(log_fd)
+
log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
- fh.setFormatter(formatter)
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
- logger_api.addHandler(fh)
+ handler.setFormatter(formatter)
+ handler.setLevel(logging.DEBUG)
+ logger.addHandler(handler)
+ logger_api.addHandler(handler)
else:
fh = None
diff --git a/wally/main.py b/wally/main.py
index 9358b53..11d6fc1 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,14 +5,19 @@
import logging
import argparse
import functools
-
+from typing import List, Tuple, Any, Callable, IO, cast, TYPE_CHECKING
from yaml import load as _yaml_load
+
+YLoader = Callable[[IO], Any]
+yaml_load = None # type: YLoader
+
+
try:
from yaml import CLoader
- yaml_load = functools.partial(_yaml_load, Loader=CLoader)
+ yaml_load = cast(YLoader, functools.partial(_yaml_load, Loader=CLoader))
except ImportError:
- yaml_load = _yaml_load
+ yaml_load = cast(YLoader, _yaml_load)
import texttable
@@ -23,93 +28,55 @@
faulthandler = None
-from .timeseries import SensorDatastore
from . import utils, run_test, pretty_yaml
-from .config import (load_config,
- get_test_files, save_run_params, load_run_params)
+from .storage import make_storage, IStorage
+from .config import Config
from .logger import setup_loggers
-from .stage import log_stage
+from .stage import log_stage, StageType
+from .test_run_class import TestRun
logger = logging.getLogger("wally")
-def get_test_names(raw_res):
- res = []
- for tp, data in raw_res:
- if not isinstance(data, list):
- raise ValueError()
-
- keys = []
- for dt in data:
- if not isinstance(dt, dict):
- raise ValueError()
-
- keys.append(",".join(dt.keys()))
-
- res.append(tp + "(" + ",".join(keys) + ")")
- return res
-
-
-def list_results(path):
+def list_results(path: str) -> List[Tuple[str, str, str, str]]:
results = []
- for dname in os.listdir(path):
+ for dir_name in os.listdir(path):
+ full_path = os.path.join(path, dir_name)
+
try:
- files_cfg = get_test_files(os.path.join(path, dname))
+ stor = make_storage(full_path, existing=True)
+ except Exception as exc:
+ logger.warning("Can't load folder {}. Error {}".format(full_path, exc))
- if not os.path.isfile(files_cfg['raw_results']):
- continue
+ comment = stor['info/comment']
+ run_uuid = stor['info/run_uuid']
+ run_time = stor['info/run_time']
+ test_types = ""
+ results.append((time.ctime(run_time),
+ run_uuid,
+ test_types,
+ run_time,
+ '-' if comment is None else comment))
- mt = os.path.getmtime(files_cfg['raw_results'])
- res_mtime = time.ctime(mt)
-
- raw_res = yaml_load(open(files_cfg['raw_results']).read())
- test_names = ",".join(sorted(get_test_names(raw_res)))
-
- params = load_run_params(files_cfg['run_params_file'])
-
- comm = params.get('comment')
- results.append((mt, dname, test_names, res_mtime,
- '-' if comm is None else comm))
- except ValueError:
- pass
-
- tab = texttable.Texttable(max_width=200)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "l", "l"])
results.sort()
-
- for data in results[::-1]:
- tab.add_row(data[1:])
-
- tab.header(["Name", "Tests", "etime", "Comment"])
-
- print(tab.draw())
+ return [i[1:] for i in results]
-def make_storage_dir_struct(cfg):
- utils.mkdirs_if_unxists(cfg.results_dir)
- utils.mkdirs_if_unxists(cfg.sensor_storage)
- utils.mkdirs_if_unxists(cfg.hwinfo_directory)
- utils.mkdirs_if_unxists(cfg.results_storage)
-
-
-def log_nodes_statistic_stage(_, ctx):
+def log_nodes_statistic_stage(ctx: TestRun) -> None:
utils.log_nodes_statistic(ctx.nodes)
def parse_args(argv):
descr = "Disk io performance test suite"
parser = argparse.ArgumentParser(prog='wally', description=descr)
- parser.add_argument("-l", '--log-level',
- help="print some extra log info")
+ parser.add_argument("-l", '--log-level', help="print some extra log info")
subparsers = parser.add_subparsers(dest='subparser_name')
# ---------------------------------------------------------------------
- compare_help = 'list all results'
- report_parser = subparsers.add_parser('ls', help=compare_help)
+ report_parser = subparsers.add_parser('ls', help='list all results')
report_parser.add_argument("result_storage", help="Folder with test results")
# ---------------------------------------------------------------------
@@ -121,91 +88,93 @@
# ---------------------------------------------------------------------
report_help = 'run report on previously obtained results'
report_parser = subparsers.add_parser('report', help=report_help)
- report_parser.add_argument('--load_report', action='store_true')
report_parser.add_argument("data_dir", help="folder with rest results")
# ---------------------------------------------------------------------
test_parser = subparsers.add_parser('test', help='run tests')
- test_parser.add_argument('--build-description',
- type=str, default="Build info")
+ test_parser.add_argument('--build-description', type=str, default="Build info")
test_parser.add_argument('--build-id', type=str, default="id")
test_parser.add_argument('--build-type', type=str, default="GA")
- test_parser.add_argument('-n', '--no-tests', action='store_true',
- help="Don't run tests", default=False)
- test_parser.add_argument('--load_report', action='store_true')
- test_parser.add_argument("-k", '--keep-vm', action='store_true',
- help="Don't remove test vm's", default=False)
+ test_parser.add_argument('-n', '--no-tests', action='store_true', help="Don't run tests")
+ test_parser.add_argument('--load-report', action='store_true')
+ test_parser.add_argument("-k", '--keep-vm', action='store_true', help="Don't remove test vm's")
test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
- help="Don't connect/discover fuel nodes",
- default=False)
- test_parser.add_argument('--no-report', action='store_true',
- help="Skip report stages", default=False)
+ help="Don't connect/discover fuel nodes")
+ test_parser.add_argument('--no-report', action='store_true', help="Skip report stages")
+ test_parser.add_argument('-r', '--resume', default=None, help="Resume previously stopped test, stored in DIR",
+ metavar="DIR")
+ test_parser.add_argument('--result-dir', default=None, help="Save results to DIR", metavart="DIR")
test_parser.add_argument("comment", help="Test information")
- test_parser.add_argument("config_file", help="Yaml config file")
+ test_parser.add_argument("config_file", help="Yaml config file", nargs='?', default=None)
# ---------------------------------------------------------------------
return parser.parse_args(argv[1:])
-def main(argv):
+def main(argv: List[str]) -> int:
if faulthandler is not None:
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
- stages = []
- report_stages = []
- ctx = Context()
- ctx.results = {}
- ctx.sensors_data = SensorDatastore()
+ stages = [] # type: List[StageType]
+ report_stages = [] # type: List[StageType]
+
+ # stop mypy from telling that config & storage might be undeclared
+ config = None # type: Config
+ storage = None # type: IStorage
if opts.subparser_name == 'test':
- cfg = load_config(opts.config_file)
- make_storage_dir_struct(cfg)
- cfg.comment = opts.comment
- save_run_params(cfg)
+ if opts.resume:
+ storage = make_storage(opts.resume, existing=True)
+ config = storage.load('config', Config)
+ else:
+ file_name = os.path.abspath(opts.config_file)
+ with open(file_name) as fd:
+ config = Config(yaml_load(fd.read())) # type: ignore
- with open(cfg.saved_config_file, 'w') as fd:
- fd.write(pretty_yaml.dumps(cfg.__dict__))
+ config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
+ config.storage_url = os.path.join(config.results_dir, config.run_uuid)
+ config.comment = opts.comment
+ config.keep_vm = opts.keep_vm
+ config.no_tests = opts.no_tests
+ config.dont_discover_nodes = opts.dont_discover_nodes
+ config.build_id = opts.build_id
+ config.build_description = opts.build_description
+ config.build_type = opts.build_type
- stages = [
- run_test.discover_stage
- ]
+ storage = make_storage(config.storage_url)
+
+ storage['config'] = config
stages.extend([
+ run_test.discover_stage,
run_test.reuse_vms_stage,
log_nodes_statistic_stage,
run_test.save_nodes_stage,
run_test.connect_stage])
- if cfg.settings.get('collect_info', True):
+ if config.get("collect_info", True):
stages.append(run_test.collect_hw_info_stage)
stages.extend([
- # deploy_sensors_stage,
run_test.run_tests_stage,
run_test.store_raw_results_stage,
- # gather_sensors_stage
])
- cfg.keep_vm = opts.keep_vm
- cfg.no_tests = opts.no_tests
- cfg.dont_discover_nodes = opts.dont_discover_nodes
-
- ctx.build_meta['build_id'] = opts.build_id
- ctx.build_meta['build_descrption'] = opts.build_description
- ctx.build_meta['build_type'] = opts.build_type
-
elif opts.subparser_name == 'ls':
- list_results(opts.result_storage)
+ tab = texttable.Texttable(max_width=200)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.set_cols_align(["l", "l", "l", "l"])
+ tab.header(["Name", "Tests", "Run at", "Comment"])
+ tab.add_rows(list_results(opts.result_storage))
+ print(tab.draw())
return 0
elif opts.subparser_name == 'report':
- cfg = load_config(get_test_files(opts.data_dir)['saved_config_file'])
- stages.append(run_test.load_data_from(opts.data_dir))
- opts.no_report = False
- # load build meta
+ storage = make_storage(opts.data_dir, existing=True)
+ config = storage.load('config', Config)
elif opts.subparser_name == 'compare':
x = run_test.load_data_from_path(opts.data_path1)
@@ -214,24 +183,25 @@
[x['io'][0], y['io'][0]]))
return 0
- if not opts.no_report:
+ if not getattr(opts, "no_report", False):
report_stages.append(run_test.console_report_stage)
- if opts.load_report:
- report_stages.append(run_test.test_load_report_stage)
report_stages.append(run_test.html_report_stage)
+ # log level is not a part of config
if opts.log_level is not None:
str_level = opts.log_level
else:
- str_level = cfg.settings.get('log_level', 'INFO')
+ str_level = config.get('logging/log_level', 'INFO')
- setup_loggers(getattr(logging, str_level), cfg.log_file)
- logger.info("All info would be stored into " + cfg.results_dir)
+ setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log'))
+ logger.info("All info would be stored into %r", config.storage_url)
+
+ ctx = TestRun(config, storage)
for stage in stages:
ok = False
with log_stage(stage):
- stage(cfg, ctx)
+ stage(ctx)
ok = True
if not ok:
break
@@ -239,7 +209,7 @@
exc, cls, tb = sys.exc_info()
for stage in ctx.clear_calls_stack[::-1]:
with log_stage(stage):
- stage(cfg, ctx)
+ stage(ctx)
logger.debug("Start utils.cleanup")
for clean_func, args, kwargs in utils.iter_clean_func():
@@ -249,13 +219,13 @@
if exc is None:
for report_stage in report_stages:
with log_stage(report_stage):
- report_stage(cfg, ctx)
+ report_stage(ctx)
- logger.info("All info stored into " + cfg.results_dir)
+ logger.info("All info is stored into %r", config.storage_url)
if exc is None:
logger.info("Tests finished successfully")
return 0
else:
- logger.error("Tests are failed. See detailed error above")
+ logger.error("Tests are failed. See error details in log above")
return 1
diff --git a/wally/meta_info.py b/wally/meta_info.py
index 19fdad9..3de33cb 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -1,5 +1,7 @@
from typing import Any, Dict
from urllib.parse import urlparse
+
+
from .keystone import KeystoneAuth
@@ -35,7 +37,7 @@
result = {}
for node in lab_info:
- # <koder>: give p,i,d,... vars meaningful names
+ # TODO(koder): give p,i,d,... vars meaningful names
d = {}
d['name'] = node['name']
p = []
diff --git a/wally/node.py b/wally/node.py
index b146876..8435cf3 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -9,7 +9,9 @@
class Node(INode):
"""Node object"""
- def __init__(self, node_info: NodeInfo):
+ def __init__(self, node_info: NodeInfo) -> None:
+ INode.__init__(self)
+
self.info = node_info
self.roles = node_info.roles
self.bind_ip = node_info.bind_ip
@@ -30,17 +32,17 @@
self.ssh_cred = None
self.node_id = None
- def __str__(self):
+ def __str__(self) -> str:
template = "<Node: url={conn_url!r} roles={roles}" + \
" connected={is_connected}>"
return template.format(conn_url=self.ssh_conn_url,
roles=", ".join(self.roles),
is_connected=self.ssh_conn is not None)
- def __repr__(self):
+ def __repr__(self) -> str:
return str(self)
- def connect_ssh(self) -> None:
+ def connect_ssh(self, timeout: int=None) -> None:
self.ssh_conn = connect(self.ssh_conn_url)
def connect_rpc(self) -> None:
@@ -106,3 +108,9 @@
return curr_iface
raise KeyError("Can't found interface for ip {0}".format(ip))
+
+ def sync_hw_info(self) -> None:
+ pass
+
+ def sync_sw_info(self) -> None:
+ pass
\ No newline at end of file
diff --git a/wally/node_rpc.py b/wally/node_rpc.py
deleted file mode 100644
index 9a458ae..0000000
--- a/wally/node_rpc.py
+++ /dev/null
@@ -1,428 +0,0 @@
-import re
-import json
-import time
-import errno
-import socket
-import shutil
-import logging
-import os.path
-import getpass
-import StringIO
-import threading
-import subprocess
-
-import paramiko
-
-from agent import connect
-from .ssh_utils import Local, ssh_connect, ssh_copy_file
-
-
-logger = logging.getLogger("wally")
-
-
-def setup_node(conn, agent_path, ip):
- agent_fname, log_fname = run_over_ssh(conn, "mktemp;echo;mktemp").strip().split()
- with conn.open_sftp() as sftp:
- ssh_copy_file(sftp, agent_path, agent_fname)
-
- cmd = "python {} server -d --listen-addr={}:0 --stdout-file={}"
- jdata = run_over_ssh(conn, cmd.format(agent_fname, ip, log_fname)).strip()
- run_over_ssh(conn, "rm {}".format(agent_fname))
- sett = json.loads(jdata)
- return connect(sett['addr'])
-
-
-def exists(rpc, path):
- """os.path.exists for paramiko's SCP object"""
- return rpc.exists(path)
-
-
-def save_to_remote(sftp, path, content):
- with sftp.open(path, "wb") as fd:
- fd.write(content)
-
-
-def read_from_remote(sftp, path):
- with sftp.open(path, "rb") as fd:
- return fd.read()
-
-
-def normalize_dirpath(dirpath):
- while dirpath.endswith("/"):
- dirpath = dirpath[:-1]
- return dirpath
-
-
-ALL_RWX_MODE = ((1 << 9) - 1)
-
-
-def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
- remotepath = normalize_dirpath(remotepath)
- if intermediate:
- try:
- sftp.mkdir(remotepath, mode=mode)
- except (IOError, OSError):
- upper_dir = remotepath.rsplit("/", 1)[0]
-
- if upper_dir == '' or upper_dir == '/':
- raise
-
- ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
- return sftp.mkdir(remotepath, mode=mode)
- else:
- sftp.mkdir(remotepath, mode=mode)
-
-
-def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
- sftp.put(localfile, remfile)
- if preserve_perm:
- sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
-
-
-def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
- "upload local directory to remote recursively"
-
- # hack for localhost connection
- if hasattr(sftp, "copytree"):
- sftp.copytree(localpath, remotepath)
- return
-
- assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
-
- # normalize
- localpath = normalize_dirpath(localpath)
- remotepath = normalize_dirpath(remotepath)
-
- try:
- sftp.chdir(remotepath)
- localsuffix = localpath.rsplit("/", 1)[1]
- remotesuffix = remotepath.rsplit("/", 1)[1]
- if localsuffix != remotesuffix:
- remotepath = os.path.join(remotepath, localsuffix)
- except IOError:
- pass
-
- for root, dirs, fls in os.walk(localpath):
- prefix = os.path.commonprefix([localpath, root])
- suffix = root.split(prefix, 1)[1]
- if suffix.startswith("/"):
- suffix = suffix[1:]
-
- remroot = os.path.join(remotepath, suffix)
-
- try:
- sftp.chdir(remroot)
- except IOError:
- if preserve_perm:
- mode = os.stat(root).st_mode & ALL_RWX_MODE
- else:
- mode = ALL_RWX_MODE
- ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
- sftp.chdir(remroot)
-
- for f in fls:
- remfile = os.path.join(remroot, f)
- localfile = os.path.join(root, f)
- ssh_copy_file(sftp, localfile, remfile, preserve_perm)
-
-
-def delete_file(conn, path):
- sftp = conn.open_sftp()
- sftp.remove(path)
- sftp.close()
-
-
-def copy_paths(conn, paths):
- sftp = conn.open_sftp()
- try:
- for src, dst in paths.items():
- try:
- if os.path.isfile(src):
- ssh_copy_file(sftp, src, dst)
- elif os.path.isdir(src):
- put_dir_recursively(sftp, src, dst)
- else:
- templ = "Can't copy {0!r} - " + \
- "it neither a file not a directory"
- raise OSError(templ.format(src))
- except Exception as exc:
- tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
- raise OSError(tmpl.format(src, dst, exc))
- finally:
- sftp.close()
-
-
-class ConnCreds(object):
- conn_uri_attrs = ("user", "passwd", "host", "port", "path")
-
- def __init__(self):
- for name in self.conn_uri_attrs:
- setattr(self, name, None)
-
- def __str__(self):
- return str(self.__dict__)
-
-
-uri_reg_exprs = []
-
-
-class URIsNamespace(object):
- class ReParts(object):
- user_rr = "[^:]*?"
- host_rr = "[^:@]*?"
- port_rr = "\\d+"
- key_file_rr = "[^:@]*"
- passwd_rr = ".*?"
-
- re_dct = ReParts.__dict__
-
- for attr_name, val in re_dct.items():
- if attr_name.endswith('_rr'):
- new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
- setattr(ReParts, attr_name, new_rr)
-
- re_dct = ReParts.__dict__
-
- templs = [
- "^{host_rr}$",
- "^{host_rr}:{port_rr}$",
- "^{host_rr}::{key_file_rr}$",
- "^{host_rr}:{port_rr}:{key_file_rr}$",
- "^{user_rr}@{host_rr}$",
- "^{user_rr}@{host_rr}:{port_rr}$",
- "^{user_rr}@{host_rr}::{key_file_rr}$",
- "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
- "^{user_rr}:{passwd_rr}@{host_rr}$",
- "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
- ]
-
- for templ in templs:
- uri_reg_exprs.append(templ.format(**re_dct))
-
-
-def parse_ssh_uri(uri):
- # user:passwd@ip_host:port
- # user:passwd@ip_host
- # user@ip_host:port
- # user@ip_host
- # ip_host:port
- # ip_host
- # user@ip_host:port:path_to_key_file
- # user@ip_host::path_to_key_file
- # ip_host:port:path_to_key_file
- # ip_host::path_to_key_file
-
- if uri.startswith("ssh://"):
- uri = uri[len("ssh://"):]
-
- res = ConnCreds()
- res.port = "22"
- res.key_file = None
- res.passwd = None
- res.user = getpass.getuser()
-
- for rr in uri_reg_exprs:
- rrm = re.match(rr, uri)
- if rrm is not None:
- res.__dict__.update(rrm.groupdict())
- return res
-
- raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-
-
-def reconnect(conn, uri, **params):
- if uri == 'local':
- return conn
-
- creds = parse_ssh_uri(uri)
- creds.port = int(creds.port)
- return ssh_connect(creds, reuse_conn=conn, **params)
-
-
-def connect(uri, **params):
- if uri == 'local':
- res = Local()
- else:
- creds = parse_ssh_uri(uri)
- creds.port = int(creds.port)
- res = ssh_connect(creds, **params)
- return res
-
-
-all_sessions_lock = threading.Lock()
-all_sessions = {}
-
-
-class BGSSHTask(object):
- CHECK_RETRY = 5
-
- def __init__(self, node, use_sudo):
- self.node = node
- self.pid = None
- self.use_sudo = use_sudo
-
- def start(self, orig_cmd, **params):
- uniq_name = 'test'
- cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
- run_over_ssh(self.node.connection, cmd,
- timeout=10, node=self.node.get_conn_id(),
- **params)
- processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
-
- for iter in range(self.CHECK_RETRY):
- for proc in processes.split("\n"):
- if orig_cmd in proc and "SCREEN" not in proc:
- self.pid = proc.split()[1]
- break
- if self.pid is not None:
- break
- time.sleep(1)
-
- if self.pid is None:
- self.pid = -1
-
- def check_running(self):
- assert self.pid is not None
- if -1 == self.pid:
- return False
- try:
- run_over_ssh(self.node.connection,
- "ls /proc/{0}".format(self.pid),
- timeout=10, nolog=True)
- return True
- except OSError:
- return False
-
- def kill(self, soft=True, use_sudo=True):
- assert self.pid is not None
- if self.pid == -1:
- return True
- try:
- if soft:
- cmd = "kill {0}"
- else:
- cmd = "kill -9 {0}"
-
- if self.use_sudo:
- cmd = "sudo " + cmd
-
- run_over_ssh(self.node.connection,
- cmd.format(self.pid), nolog=True)
- return True
- except OSError:
- return False
-
- def wait(self, soft_timeout, timeout):
- end_of_wait_time = timeout + time.time()
- soft_end_of_wait_time = soft_timeout + time.time()
-
- # time_till_check = random.randint(5, 10)
- time_till_check = 2
-
- # time_till_first_check = random.randint(2, 6)
- time_till_first_check = 2
- time.sleep(time_till_first_check)
- if not self.check_running():
- return True
-
- while self.check_running() and time.time() < soft_end_of_wait_time:
- # time.sleep(soft_end_of_wait_time - time.time())
- time.sleep(time_till_check)
-
- while end_of_wait_time > time.time():
- time.sleep(time_till_check)
- if not self.check_running():
- break
- else:
- self.kill()
- time.sleep(1)
- if self.check_running():
- self.kill(soft=False)
- return False
- return True
-
-
-def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
- nolog=False, node=None):
- "should be replaces by normal implementation, with select"
-
- if isinstance(conn, Local):
- if not nolog:
- logger.debug("SSH:local Exec {0!r}".format(cmd))
- proc = subprocess.Popen(cmd, shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
-
- stdoutdata, _ = proc.communicate(input=stdin_data)
- if proc.returncode != 0:
- templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
- raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
-
- return stdoutdata
-
- transport = conn.get_transport()
- session = transport.open_session()
-
- if node is None:
- node = ""
-
- with all_sessions_lock:
- all_sessions[id(session)] = session
-
- try:
- session.set_combine_stderr(True)
-
- stime = time.time()
-
- if not nolog:
- logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
-
- session.exec_command(cmd)
-
- if stdin_data is not None:
- session.sendall(stdin_data)
-
- session.settimeout(1)
- session.shutdown_write()
- output = ""
-
- while True:
- try:
- ndata = session.recv(1024)
- output += ndata
- if "" == ndata:
- break
- except socket.timeout:
- pass
-
- if time.time() - stime > timeout:
- raise OSError(output + "\nExecution timeout")
-
- code = session.recv_exit_status()
- finally:
- found = False
- with all_sessions_lock:
- if id(session) in all_sessions:
- found = True
- del all_sessions[id(session)]
-
- if found:
- session.close()
-
- if code != 0:
- templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
- raise OSError(templ.format(node, cmd, code, output))
-
- return output
-
-
-def close_all_sessions():
- with all_sessions_lock:
- for session in all_sessions.values():
- try:
- session.sendall('\x03')
- session.close()
- except:
- pass
- all_sessions.clear()
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index 4038ea8..7cd0f3a 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,7 +1,7 @@
__doc__ = "functions for make pretty yaml files"
__all__ = ['dumps']
-from typing import Any, Iterable
+from typing import Any, Iterable, List
def dumps_simple(val: Any) -> str:
@@ -37,13 +37,13 @@
return all(isinstance(val, (int, float)) for val in vals)
-def dumpv(data: Any, tab_sz: int=4, width: int=160, min_width: int=40) -> str:
+def dumpv(data: Any, tab_sz: int = 4, width: int = 160, min_width: int = 40) -> List[str]:
tab = ' ' * tab_sz
if width < min_width:
width = min_width
- res = []
+ res = [] # type: List[str]
if is_simple(data):
return [dumps_simple(data)]
@@ -108,5 +108,5 @@
return res
-def dumps(data: Any, tab_sz: int=4, width: int=120, min_width: int=40) -> str:
+def dumps(data: Any, tab_sz: int = 4, width: int = 120, min_width: int = 40) -> str:
return "\n".join(dumpv(data, tab_sz, width, min_width))
diff --git a/wally/run_test.py b/wally/run_test.py
index 4390f5e..b96ecf8 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,28 +1,16 @@
import os
-import re
import time
import logging
import functools
import contextlib
import collections
-from typing import List, Dict, Optional, Iterable, Any, Generator, Mapping, Callable
-from yaml import load as _yaml_load
+from typing import List, Dict, Iterable, Any, Iterator, Mapping, Callable, Tuple, Optional
+from concurrent.futures import ThreadPoolExecutor, Future
-try:
- from yaml import CLoader
- yaml_load = functools.partial(_yaml_load, Loader=CLoader)
-except ImportError:
- yaml_load = _yaml_load
-
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from .config import Config
-from .config import get_test_files
-from .discover import discover, Node
from .inode import INode
+from .discover import discover
from .test_run_class import TestRun
-
-from . import pretty_yaml, utils, report, ssh_utils, start_vms
+from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
from .suits.mysql import MysqlTest
from .suits.itest import TestConfig
@@ -42,27 +30,27 @@
logger = logging.getLogger("wally")
-def connect_all(nodes: Iterable[INode], spawned_node: Optional[bool]=False) -> None:
+
+def connect_all(nodes: Iterable[INode],
+ pool: ThreadPoolExecutor,
+ conn_timeout: int = 30,
+ rpc_conn_callback: ssh_utils.RPCBeforeConnCallback = None) -> None:
"""Connect to all nodes, log errors
- nodes:[Node] - list of nodes
- spawned_node:bool - whenever nodes is newly spawned VM
+ nodes - list of nodes
"""
- logger.info("Connecting to nodes")
-
- conn_timeout = 240 if spawned_node else 30
+ logger.info("Connecting to %s nodes", len(nodes))
def connect_ext(node: INode) -> bool:
try:
node.connect_ssh(conn_timeout)
- node.connect_rpc(conn_timeout)
+ node.rpc, node.rpc_params = ssh_utils.setup_rpc(node, rpc_conn_callback=rpc_conn_callback)
return True
except Exception as exc:
logger.error("During connect to {}: {!s}".format(node, exc))
return False
- with ThreadPoolExecutor(32) as pool:
- list(pool.map(connect_ext, nodes))
+ list(pool.map(connect_ext, nodes))
failed_testnodes = []
failed_nodes = []
@@ -88,39 +76,25 @@
logger.info("All nodes connected successfully")
-def collect_hw_info_stage(cfg: Config, nodes: Iterable[INode]) -> None:
- # TODO(koder): rewrite this function, to use other storage type
- if os.path.exists(cfg.hwreport_fname):
- msg = "{0} already exists. Skip hw info"
- logger.info(msg.format(cfg['hwreport_fname']))
- return
+def collect_info_stage(ctx: TestRun, nodes: Iterable[INode]) -> None:
+ futures = {} # type: Dict[str, Future]
- with ThreadPoolExecutor(32) as pool:
- fitures = pool.submit(node.discover_hardware_info
- for node in nodes)
- wait(fitures)
-
- with open(cfg.hwreport_fname, 'w') as hwfd:
+ with ctx.get_pool() as pool:
for node in nodes:
- hwfd.write("-" * 60 + "\n")
- hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
- hwfd.write(str(node.hwinfo) + "\n")
- hwfd.write("-" * 60 + "\n\n")
+ hw_info_path = "hw_info/{}".format(node.node_id())
+ if hw_info_path not in ctx.storage:
+ futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node)
- if node.hwinfo.hostname is not None:
- fname = os.path.join(
- cfg.hwinfo_directory,
- node.hwinfo.hostname + "_lshw.xml")
+ sw_info_path = "sw_info/{}".format(node.node_id())
+ if sw_info_path not in ctx.storage:
+ futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
- with open(fname, "w") as fd:
- fd.write(node.hwinfo.raw)
-
- logger.info("Hardware report stored in " + cfg.hwreport_fname)
- logger.debug("Raw hardware info in " + cfg.hwinfo_directory + " folder")
+ for path, future in futures.items():
+ ctx.storage[path] = future.result()
@contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes: Iterable[INode]) -> Generator[List[int]]:
+def suspend_vm_nodes_ctx(unused_nodes: List[INode]) -> Iterator[List[int]]:
pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
if node.os_vm_id is not None]
@@ -163,17 +137,17 @@
@contextlib.contextmanager
-def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Generator[None]:
+def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Iterator[None]:
# TODO(koder): write this function
pass
-def run_tests(cfg: Config, test_block, nodes: Iterable[INode]) -> None:
- """
- Run test from test block
- """
+def run_tests(cfg: Config,
+ test_block: Dict[str, Dict[str, Any]],
+ nodes: Iterable[INode]) -> Iterator[Tuple[str, List[Any]]]:
+ """Run test from test block"""
+
test_nodes = [node for node in nodes if 'testnode' in node.roles]
- not_test_nodes = [node for node in nodes if 'testnode' not in node.roles]
if len(test_nodes) == 0:
logger.error("No test nodes found")
@@ -185,7 +159,7 @@
# iterate over all node counts
limit = params.get('node_limit', len(test_nodes))
if isinstance(limit, int):
- vm_limits = [limit]
+ vm_limits = [limit] # type: List[int]
else:
list_or_tpl = isinstance(limit, (tuple, list))
all_ints = list_or_tpl and all(isinstance(climit, int)
@@ -194,7 +168,7 @@
msg = "'node_limit' parameter ion config should" + \
"be either int or list if integers, not {0!r}".format(limit)
raise ValueError(msg)
- vm_limits = limit
+ vm_limits = limit # type: List[int]
for vm_count in vm_limits:
# select test nodes
@@ -249,7 +223,7 @@
def connect_stage(cfg: Config, ctx: TestRun) -> None:
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
- ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
+ ctx.nodes = [node for node in ctx.nodes if node.is_connected()]
def discover_stage(cfg: Config, ctx: TestRun) -> None:
@@ -279,7 +253,7 @@
roles.remove('testnode')
if len(roles) != 0:
- cluster[node.conn_url] = roles
+ cluster[node.ssh_conn_url] = roles
with open(cfg.nodes_report_file, "w") as fd:
fd.write(pretty_yaml.dumps(cluster))
@@ -363,7 +337,7 @@
def get_vm_keypair(cfg: Config) -> Dict[str, str]:
- res = {}
+ res = {} # type: Dict[str, str]
for field, ext in (('keypair_file_private', 'pem'),
('keypair_file_public', 'pub')):
fpath = cfg.vm_configs.get(field)
@@ -379,7 +353,7 @@
@contextlib.contextmanager
-def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Generator[List[INode]]:
+def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Iterator[List[INode]]:
if config['count'].startswith('='):
count = int(config['count'][1:])
if count <= already_has_count:
@@ -405,7 +379,7 @@
if not config.get('skip_preparation', False):
logger.info("Preparing openstack")
- start_vms.prepare_os_subpr(nova, params, os_creds)
+ start_vms.prepare_os(nova, params, os_creds)
new_nodes = []
old_nodes = ctx.nodes[:]
@@ -431,13 +405,13 @@
ctx.results = collections.defaultdict(lambda: [])
for group in cfg.get('tests', []):
-
- if len(group.items()) != 1:
+ gitems = list(group.items())
+ if len(gitems) != 1:
msg = "Items in tests section should have len == 1"
logger.error(msg)
raise utils.StopTestError(msg)
- key, config = group.items()[0]
+ key, config = gitems[0]
if 'start_test_nodes' == key:
if 'openstack' not in config:
@@ -469,7 +443,8 @@
if not cfg.no_tests:
for test_group in tests:
with sensor_ctx:
- for tp, res in run_tests(cfg, test_group, ctx.nodes):
+ it = run_tests(cfg, test_group, ctx.nodes)
+ for tp, res in it:
ctx.results[tp].extend(res)
@@ -489,7 +464,7 @@
os.remove(vm_ids_fname)
-def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]):
+def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]) -> None:
with open(cfg.vm_ids_fname, 'w') as fd:
fd.write("\n".join(nodes_ids))
@@ -503,8 +478,7 @@
ssh_utils.close_all_sessions()
for node in ctx.nodes:
- if node.connection is not None:
- node.connection.close()
+ node.disconnect()
def store_raw_results_stage(cfg: Config, ctx: TestRun) -> None:
@@ -577,7 +551,7 @@
report.make_io_report(list(data[0]),
cfg.get('comment', ''),
html_rep_fname,
- lab_info=ctx.hw_info)
+ lab_info=ctx.nodes)
def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
@@ -599,5 +573,5 @@
ctx.results.setdefault(tp, []).extend(vals)
-def load_data_from(var_dir: str) -> Callable:
+def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/sensors.py b/wally/sensors_rpc_plugin.py
similarity index 98%
rename from wally/sensors.py
rename to wally/sensors_rpc_plugin.py
index 7560a11..a2758c3 100644
--- a/wally/sensors.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,8 +1,8 @@
import os
from collections import namedtuple
-
SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+# SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
def provides(name: str):
@@ -321,7 +321,7 @@
# 6 - irq: servicing interrupts
# 7 - softirq: servicing softirqs
-io_values_pos = [
+cpu_values_pos = [
(1, 'user_processes', True),
(2, 'nice_processes', True),
(3, 'system_processes', True),
@@ -341,7 +341,7 @@
dev_name = vals[0]
if dev_name == 'cpu':
- for pos, name, accum_val in io_values_pos:
+ for pos, name, accum_val in cpu_values_pos:
sensor_name = "{0}.{1}".format(dev_name, name)
results[sensor_name] = SensorInfo(int(vals[pos]),
accum_val)
diff --git a/wally/sensors_rpc_plugin.pyi b/wally/sensors_rpc_plugin.pyi
new file mode 100644
index 0000000..c4b387b
--- /dev/null
+++ b/wally/sensors_rpc_plugin.pyi
@@ -0,0 +1,24 @@
+import os
+from typing import NamedTuple, TypeVar, Callable, Any, Optional, List, Iterable, Dict, Tuple
+
+SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
+Pid = TypeVar('Pid', str)
+AnyFunc = TypeVar('AnyFunc', Callable[..., Any])
+PrefixList = Optional[List[str]]
+SensorDict = Dict[str, SensorInfo]
+
+def provides(name: str) -> Callable[[AnyFunc], AnyFunc]: ...
+def is_dev_accepted(name, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
+def get_pid_name(pid: Pid) -> str: ...
+def delta(func, only_upd: bool = True) -> Iterable[Optional[str], Optional[Dict[str, str]]]: ...
+def get_latency(stat1: SensorDict, stat2: SensorDict) -> SensorDict: ...
+def pid_stat(pid: Pid) -> float: ...
+def get_mem_stats(pid : Pid) -> Tuple[int, int]: ...
+def get_ram_size() -> str: ...
+
+def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index ada4af6..2941e7c 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,298 +1,25 @@
import re
+import json
import time
-import errno
-import random
import socket
-import shutil
import logging
import os.path
import getpass
-import StringIO
-import threading
+from io import BytesIO
import subprocess
+from typing import Union, Optional, cast, Dict, List, Tuple, Any, Callable
+from concurrent.futures import ThreadPoolExecutor
import paramiko
+import agent
+
+from . import interfaces, utils
+
logger = logging.getLogger("wally")
-class Local(object):
- "simulate ssh connection to local"
- @classmethod
- def open_sftp(cls):
- return cls()
-
- @classmethod
- def mkdir(cls, remotepath, mode=None):
- os.mkdir(remotepath)
- if mode is not None:
- os.chmod(remotepath, mode)
-
- @classmethod
- def put(cls, localfile, remfile):
- dirname = os.path.dirname(remfile)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- shutil.copyfile(localfile, remfile)
-
- @classmethod
- def get(cls, remfile, localfile):
- dirname = os.path.dirname(localfile)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- shutil.copyfile(remfile, localfile)
-
- @classmethod
- def chmod(cls, path, mode):
- os.chmod(path, mode)
-
- @classmethod
- def copytree(cls, src, dst):
- shutil.copytree(src, dst)
-
- @classmethod
- def remove(cls, path):
- os.unlink(path)
-
- @classmethod
- def close(cls):
- pass
-
- @classmethod
- def open(cls, *args, **kwarhgs):
- return open(*args, **kwarhgs)
-
- @classmethod
- def stat(cls, path):
- return os.stat(path)
-
- def __enter__(self):
- return self
-
- def __exit__(self, x, y, z):
- return False
-
-
-NODE_KEYS = {}
-
-
-def exists(sftp, path):
- "os.path.exists for paramiko's SCP object"
- try:
- sftp.stat(path)
- return True
- except IOError as e:
- if e.errno == errno.ENOENT:
- return False
- raise
-
-
-def set_key_for_node(host_port, key):
- sio = StringIO.StringIO(key)
- NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
- sio.close()
-
-
-def ssh_connect(creds, conn_timeout=60, reuse_conn=None):
- if creds == 'local':
- return Local()
-
- tcp_timeout = 15
- banner_timeout = 30
-
- if reuse_conn is None:
- ssh = paramiko.SSHClient()
- ssh.load_host_keys('/dev/null')
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.known_hosts = None
- else:
- ssh = reuse_conn
-
- etime = time.time() + conn_timeout
-
- while True:
- try:
- tleft = etime - time.time()
- c_tcp_timeout = min(tcp_timeout, tleft)
-
- if paramiko.__version_info__ >= (1, 15, 2):
- banner_timeout = {'banner_timeout': min(banner_timeout, tleft)}
- else:
- banner_timeout = {}
-
- if creds.passwd is not None:
- ssh.connect(creds.host,
- timeout=c_tcp_timeout,
- username=creds.user,
- password=creds.passwd,
- port=creds.port,
- allow_agent=False,
- look_for_keys=False,
- **banner_timeout)
- elif creds.key_file is not None:
- ssh.connect(creds.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- key_filename=creds.key_file,
- look_for_keys=False,
- port=creds.port,
- **banner_timeout)
- elif (creds.host, creds.port) in NODE_KEYS:
- ssh.connect(creds.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- pkey=NODE_KEYS[(creds.host, creds.port)],
- look_for_keys=False,
- port=creds.port,
- **banner_timeout)
- else:
- key_file = os.path.expanduser('~/.ssh/id_rsa')
- ssh.connect(creds.host,
- username=creds.user,
- timeout=c_tcp_timeout,
- key_filename=key_file,
- look_for_keys=False,
- port=creds.port,
- **banner_timeout)
- return ssh
- except paramiko.PasswordRequiredException:
- raise
- except (socket.error, paramiko.SSHException):
- if time.time() > etime:
- raise
- time.sleep(1)
-
-
-def save_to_remote(sftp, path, content):
- with sftp.open(path, "wb") as fd:
- fd.write(content)
-
-
-def read_from_remote(sftp, path):
- with sftp.open(path, "rb") as fd:
- return fd.read()
-
-
-def normalize_dirpath(dirpath):
- while dirpath.endswith("/"):
- dirpath = dirpath[:-1]
- return dirpath
-
-
-ALL_RWX_MODE = ((1 << 9) - 1)
-
-
-def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
- remotepath = normalize_dirpath(remotepath)
- if intermediate:
- try:
- sftp.mkdir(remotepath, mode=mode)
- except (IOError, OSError):
- upper_dir = remotepath.rsplit("/", 1)[0]
-
- if upper_dir == '' or upper_dir == '/':
- raise
-
- ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
- return sftp.mkdir(remotepath, mode=mode)
- else:
- sftp.mkdir(remotepath, mode=mode)
-
-
-def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
- sftp.put(localfile, remfile)
- if preserve_perm:
- sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
-
-
-def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
- "upload local directory to remote recursively"
-
- # hack for localhost connection
- if hasattr(sftp, "copytree"):
- sftp.copytree(localpath, remotepath)
- return
-
- assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
-
- # normalize
- localpath = normalize_dirpath(localpath)
- remotepath = normalize_dirpath(remotepath)
-
- try:
- sftp.chdir(remotepath)
- localsuffix = localpath.rsplit("/", 1)[1]
- remotesuffix = remotepath.rsplit("/", 1)[1]
- if localsuffix != remotesuffix:
- remotepath = os.path.join(remotepath, localsuffix)
- except IOError:
- pass
-
- for root, dirs, fls in os.walk(localpath):
- prefix = os.path.commonprefix([localpath, root])
- suffix = root.split(prefix, 1)[1]
- if suffix.startswith("/"):
- suffix = suffix[1:]
-
- remroot = os.path.join(remotepath, suffix)
-
- try:
- sftp.chdir(remroot)
- except IOError:
- if preserve_perm:
- mode = os.stat(root).st_mode & ALL_RWX_MODE
- else:
- mode = ALL_RWX_MODE
- ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
- sftp.chdir(remroot)
-
- for f in fls:
- remfile = os.path.join(remroot, f)
- localfile = os.path.join(root, f)
- ssh_copy_file(sftp, localfile, remfile, preserve_perm)
-
-
-def delete_file(conn, path):
- sftp = conn.open_sftp()
- sftp.remove(path)
- sftp.close()
-
-
-def copy_paths(conn, paths):
- sftp = conn.open_sftp()
- try:
- for src, dst in paths.items():
- try:
- if os.path.isfile(src):
- ssh_copy_file(sftp, src, dst)
- elif os.path.isdir(src):
- put_dir_recursively(sftp, src, dst)
- else:
- templ = "Can't copy {0!r} - " + \
- "it neither a file not a directory"
- raise OSError(templ.format(src))
- except Exception as exc:
- tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
- raise OSError(tmpl.format(src, dst, exc))
- finally:
- sftp.close()
-
-
-class ConnCreds(object):
- conn_uri_attrs = ("user", "passwd", "host", "port", "path")
-
- def __init__(self):
- for name in self.conn_uri_attrs:
- setattr(self, name, None)
-
- def __str__(self):
- return str(self.__dict__)
-
-
-uri_reg_exprs = []
-
-
class URIsNamespace(object):
class ReParts(object):
user_rr = "[^:]*?"
@@ -323,11 +50,29 @@
"^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
]
+ uri_reg_exprs = [] # type: List[str]
for templ in templs:
uri_reg_exprs.append(templ.format(**re_dct))
-def parse_ssh_uri(uri):
+class ConnCreds:
+ conn_uri_attrs = ("user", "passwd", "host", "port", "key_file")
+
+ def __init__(self) -> None:
+ self.user = None # type: Optional[str]
+ self.passwd = None # type: Optional[str]
+ self.host = None # type: str
+ self.port = 22 # type: int
+ self.key_file = None # type: Optional[str]
+
+ def __str__(self) -> str:
+ return str(self.__dict__)
+
+
+SSHCredsType = Union[str, ConnCreds]
+
+
+def parse_ssh_uri(uri: str) -> ConnCreds:
# [ssh://]+
# user:passwd@ip_host:port
# user:passwd@ip_host
@@ -344,12 +89,12 @@
uri = uri[len("ssh://"):]
res = ConnCreds()
- res.port = "22"
+ res.port = 22
res.key_file = None
res.passwd = None
res.user = getpass.getuser()
- for rr in uri_reg_exprs:
+ for rr in URIsNamespace.uri_reg_exprs:
rrm = re.match(rr, uri)
if rrm is not None:
res.__dict__.update(rrm.groupdict())
@@ -358,18 +103,174 @@
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-def reconnect(conn, uri, **params):
+class LocalHost(interfaces.IHost):
+ def __str__(self):
+ return "<Local>"
+
+ def get_ip(self) -> str:
+ return 'localhost'
+
+ def put_to_file(self, path: str, content: bytes) -> None:
+ dirname = os.path.dirname(path)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+ with open(path, "wb") as fd:
+ fd.write(content)
+
+ def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+ proc = subprocess.Popen(cmd, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ stdout_data, _ = proc.communicate()
+ if proc.returncode != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
+
+ return stdout_data
+
+
+class SSHHost(interfaces.IHost):
+ def __init__(self, ssh_conn, node_name: str, ip: str) -> None:
+ self.conn = ssh_conn
+ self.node_name = node_name
+ self.ip = ip
+
+ def get_ip(self) -> str:
+ return self.ip
+
+ def __str__(self) -> str:
+ return self.node_name
+
+ def put_to_file(self, path: str, content: bytes) -> None:
+ with self.conn.open_sftp() as sftp:
+ with sftp.open(path, "wb") as fd:
+ fd.write(content)
+
+ def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+ transport = self.conn.get_transport()
+ session = transport.open_session()
+
+ try:
+ session.set_combine_stderr(True)
+
+ stime = time.time()
+
+ if not nolog:
+ logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
+
+ session.exec_command(cmd)
+ session.settimeout(1)
+ session.shutdown_write()
+ output = ""
+
+ while True:
+ try:
+ ndata = session.recv(1024)
+ output += ndata
+ if "" == ndata:
+ break
+ except socket.timeout:
+ pass
+
+ if time.time() - stime > timeout:
+ raise OSError(output + "\nExecution timeout")
+
+ code = session.recv_exit_status()
+ finally:
+ found = False
+
+ if found:
+ session.close()
+
+ if code != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(self, cmd, code, output))
+
+ return output
+
+
+NODE_KEYS = {} # type: Dict[Tuple[str, int], paramiko.RSAKey]
+
+
+def set_key_for_node(host_port: Tuple[str, int], key: bytes) -> None:
+ sio = BytesIO(key)
+ NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
+ sio.close()
+
+
+def ssh_connect(creds: SSHCredsType, conn_timeout: int = 60) -> interfaces.IHost:
+ if creds == 'local':
+ return LocalHost()
+
+ tcp_timeout = 15
+ default_banner_timeout = 30
+
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys('/dev/null')
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.known_hosts = None
+
+ end_time = time.time() + conn_timeout # type: float
+
+ while True:
+ try:
+ time_left = end_time - time.time()
+ c_tcp_timeout = min(tcp_timeout, time_left)
+
+ banner_timeout_arg = {} # type: Dict[str, int]
+ if paramiko.__version_info__ >= (1, 15, 2):
+ banner_timeout_arg['banner_timeout'] = int(min(default_banner_timeout, time_left))
+
+ creds = cast(ConnCreds, creds)
+
+ if creds.passwd is not None:
+ ssh.connect(creds.host,
+ timeout=c_tcp_timeout,
+ username=creds.user,
+ password=cast(str, creds.passwd),
+ port=creds.port,
+ allow_agent=False,
+ look_for_keys=False,
+ **banner_timeout_arg)
+ elif creds.key_file is not None:
+ ssh.connect(creds.host,
+ username=creds.user,
+ timeout=c_tcp_timeout,
+ key_filename=cast(str, creds.key_file),
+ look_for_keys=False,
+ port=creds.port,
+ **banner_timeout_arg)
+ elif (creds.host, creds.port) in NODE_KEYS:
+ ssh.connect(creds.host,
+ username=creds.user,
+ timeout=c_tcp_timeout,
+ pkey=NODE_KEYS[(creds.host, creds.port)],
+ look_for_keys=False,
+ port=creds.port,
+ **banner_timeout_arg)
+ else:
+ key_file = os.path.expanduser('~/.ssh/id_rsa')
+ ssh.connect(creds.host,
+ username=creds.user,
+ timeout=c_tcp_timeout,
+ key_filename=key_file,
+ look_for_keys=False,
+ port=creds.port,
+ **banner_timeout_arg)
+ return SSHHost(ssh, "{0.host}:{0.port}".format(creds), creds.host)
+ except paramiko.PasswordRequiredException:
+ raise
+ except (socket.error, paramiko.SSHException):
+ if time.time() > end_time:
+ raise
+ time.sleep(1)
+
+
+def connect(uri: str, **params) -> interfaces.IHost:
if uri == 'local':
- return conn
-
- creds = parse_ssh_uri(uri)
- creds.port = int(creds.port)
- return ssh_connect(creds, reuse_conn=conn, **params)
-
-
-def connect(uri, **params):
- if uri == 'local':
- res = Local()
+ res = LocalHost()
else:
creds = parse_ssh_uri(uri)
creds.port = int(creds.port)
@@ -377,180 +278,58 @@
return res
-all_sessions_lock = threading.Lock()
-all_sessions = {}
+SetupResult = Tuple[interfaces.IRPC, Dict[str, Any]]
-class BGSSHTask(object):
- CHECK_RETRY = 5
+RPCBeforeConnCallback = Callable[[interfaces.IHost, int], None]
- def __init__(self, node, use_sudo):
- self.node = node
- self.pid = None
- self.use_sudo = use_sudo
- def start(self, orig_cmd, **params):
- uniq_name = 'test'
- cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
- run_over_ssh(self.node.connection, cmd,
- timeout=10, node=self.node.get_conn_id(),
- **params)
- processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
+def setup_rpc(node: interfaces.IHost,
+ rpc_server_code: bytes,
+ port: int = 0,
+ rpc_conn_callback: RPCBeforeConnCallback = None) -> SetupResult:
+ code_file = node.run("mktemp").strip()
+ log_file = node.run("mktemp").strip()
+ node.put_to_file(code_file, rpc_server_code)
+ cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
+ "--show-settings --stdout-file={out_file}"
+ params_js = node.run(cmd.format(code_file=code_file,
+ listen_addr=node.get_ip(),
+ out_file=log_file,
+ port=port)).strip()
+ params = json.loads(params_js)
+ params['log_file'] = log_file
- for iter in range(self.CHECK_RETRY):
- for proc in processes.split("\n"):
- if orig_cmd in proc and "SCREEN" not in proc:
- self.pid = proc.split()[1]
- break
- if self.pid is not None:
- break
- time.sleep(1)
+ if rpc_conn_callback:
+ ip, port = rpc_conn_callback(node, port)
+ else:
+ ip = node.get_ip()
+ port = int(params['addr'].split(":")[1])
- if self.pid is None:
- self.pid = -1
+ return agent.connect((ip, port)), params
- def check_running(self):
- assert self.pid is not None
- if -1 == self.pid:
- return False
+
+def wait_ssh_awailable(addrs: List[Tuple[str, int]],
+ timeout: int = 300,
+ tcp_timeout: float = 1.0,
+ max_threads: int = 32) -> None:
+ addrs = addrs[:]
+ tout = utils.Timeout(timeout)
+
+ def check_sock(addr):
+ s = socket.socket()
+ s.settimeout(tcp_timeout)
try:
- run_over_ssh(self.node.connection,
- "ls /proc/{0}".format(self.pid),
- timeout=10, nolog=True)
+ s.connect(addr)
return True
- except OSError:
+ except (socket.timeout, ConnectionRefusedError):
return False
- def kill(self, soft=True, use_sudo=True):
- assert self.pid is not None
- if self.pid == -1:
- return True
- try:
- if soft:
- cmd = "kill {0}"
- else:
- cmd = "kill -9 {0}"
-
- if self.use_sudo:
- cmd = "sudo " + cmd
-
- run_over_ssh(self.node.connection,
- cmd.format(self.pid), nolog=True)
- return True
- except OSError:
- return False
-
- def wait(self, soft_timeout, timeout):
- end_of_wait_time = timeout + time.time()
- soft_end_of_wait_time = soft_timeout + time.time()
-
- # time_till_check = random.randint(5, 10)
- time_till_check = 2
-
- # time_till_first_check = random.randint(2, 6)
- time_till_first_check = 2
- time.sleep(time_till_first_check)
- if not self.check_running():
- return True
-
- while self.check_running() and time.time() < soft_end_of_wait_time:
- # time.sleep(soft_end_of_wait_time - time.time())
- time.sleep(time_till_check)
-
- while end_of_wait_time > time.time():
- time.sleep(time_till_check)
- if not self.check_running():
- break
- else:
- self.kill()
- time.sleep(1)
- if self.check_running():
- self.kill(soft=False)
- return False
- return True
+ with ThreadPoolExecutor(max_workers=max_threads) as pool:
+ while addrs:
+ check_result = pool.map(check_sock, addrs)
+ addrs = [addr for ok, addr in zip(check_result, addrs) if not ok] # type: List[Tuple[str, int]]
+ tout.tick()
-def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
- nolog=False, node=None):
- "should be replaces by normal implementation, with select"
- if isinstance(conn, Local):
- if not nolog:
- logger.debug("SSH:local Exec {0!r}".format(cmd))
- proc = subprocess.Popen(cmd, shell=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
-
- stdoutdata, _ = proc.communicate(input=stdin_data)
- if proc.returncode != 0:
- templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
- raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
-
- return stdoutdata
-
- transport = conn.get_transport()
- session = transport.open_session()
-
- if node is None:
- node = ""
-
- with all_sessions_lock:
- all_sessions[id(session)] = session
-
- try:
- session.set_combine_stderr(True)
-
- stime = time.time()
-
- if not nolog:
- logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
-
- session.exec_command(cmd)
-
- if stdin_data is not None:
- session.sendall(stdin_data)
-
- session.settimeout(1)
- session.shutdown_write()
- output = ""
-
- while True:
- try:
- ndata = session.recv(1024)
- output += ndata
- if "" == ndata:
- break
- except socket.timeout:
- pass
-
- if time.time() - stime > timeout:
- raise OSError(output + "\nExecution timeout")
-
- code = session.recv_exit_status()
- finally:
- found = False
- with all_sessions_lock:
- if id(session) in all_sessions:
- found = True
- del all_sessions[id(session)]
-
- if found:
- session.close()
-
- if code != 0:
- templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
- raise OSError(templ.format(node, cmd, code, output))
-
- return output
-
-
-def close_all_sessions():
- with all_sessions_lock:
- for session in all_sessions.values():
- try:
- session.sendall('\x03')
- session.close()
- except:
- pass
- all_sessions.clear()
diff --git a/wally/stage.py b/wally/stage.py
index 5f47a1b..e06e31d 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,28 +1,16 @@
import logging
import contextlib
-
+from typing import Callable
from .utils import StopTestError
+from .test_run_class import TestRun
+
logger = logging.getLogger("wally")
-class TestStage:
- name = ""
-
- def __init__(self, testrun, config):
- self.testrun = testrun
- self.config = config
-
- def __enter__(self):
- return self
-
- def __exit__(self):
- return self
-
-
@contextlib.contextmanager
-def log_stage(stage):
+def log_stage(stage) -> None:
msg_templ = "Exception during {0}: {1!s}"
msg_templ_no_exc = "During {0}"
@@ -31,6 +19,9 @@
try:
yield
except StopTestError as exc:
- logger.error(msg_templ.format(stage.name, exc))
+ logger.error(msg_templ.format(stage.__name__, exc))
except Exception:
- logger.exception(msg_templ_no_exc.format(stage.name))
+ logger.exception(msg_templ_no_exc.format(stage.__name__))
+
+
+StageType = Callable[[TestRun], None]
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 759d63b..075f348 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -5,18 +5,15 @@
import urllib
import os.path
import logging
-import collections
-from typing import Dict, Any, Iterable
+from typing import Dict, Any, Iterable, Generator, NamedTuple
from concurrent.futures import ThreadPoolExecutor
from novaclient.exceptions import NotFound
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
-import wally
-from wally.discover import Node
-
+from .inode import NodeInfo
__doc__ = """
Module used to reliably spawn set of VM's, evenly distributed across
@@ -44,18 +41,23 @@
return NOVA_CONNECTION is not None
-OSCreds = collections.namedtuple("OSCreds",
- ["name", "passwd",
- "tenant", "auth_url", "insecure"])
+OSCreds = NamedTuple("OSCreds",
+ [("name", str),
+ ("passwd", str),
+ ("tenant", str),
+ ("auth_url", str),
+ ("insecure", bool)])
def ostack_get_creds() -> OSCreds:
if STORED_OPENSTACK_CREDS is None:
+ is_insecure = \
+ os.environ.get('OS_INSECURE', 'False').lower() in ('true', 'yes')
return OSCreds(os.environ.get('OS_USERNAME'),
os.environ.get('OS_PASSWORD'),
os.environ.get('OS_TENANT_NAME'),
os.environ.get('OS_AUTH_URL'),
- os.environ.get('OS_INSECURE', False))
+ is_insecure)
else:
return STORED_OPENSTACK_CREDS
@@ -106,8 +108,8 @@
break
-def pause(ids: Iterable[int]) -> None:
- def pause_vm(conn, vm_id):
+def pause(ids: Iterable[str]) -> None:
+ def pause_vm(conn: n_client, vm_id: str) -> None:
vm = conn.servers.get(vm_id)
if vm.status == 'ACTIVE':
vm.pause()
@@ -120,8 +122,8 @@
future.result()
-def unpause(ids: Iterable[int], max_resume_time=10) -> None:
- def unpause(conn, vm_id):
+def unpause(ids: Iterable[str], max_resume_time=10) -> None:
+ def unpause(conn: n_client, vm_id: str) -> None:
vm = conn.servers.get(vm_id)
if vm.status == 'PAUSED':
vm.unpause()
@@ -424,7 +426,7 @@
return [ip for ip in ip_list if ip.instance_id is None][:amount]
-def launch_vms(nova, params, already_has_count=0):
+def launch_vms(nova, params, already_has_count=0) -> Iterator[NodeInfo]:
"""launch virtual servers
Parameters:
@@ -461,7 +463,7 @@
lst = nova.services.list(binary='nova-compute')
srv_count = len([srv for srv in lst if srv.status == 'enabled'])
- if isinstance(count, basestring):
+ if isinstance(count, str):
if count.startswith("x"):
count = srv_count * int(count[1:])
else:
@@ -474,7 +476,7 @@
logger.debug("Starting new nodes on openstack")
- assert isinstance(count, (int, long))
+ assert isinstance(count, int)
srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
msg_templ = "Will start {0} servers with next params: {1}"
@@ -500,7 +502,7 @@
for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **vm_params):
conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
- yield Node(conn_uri, []), os_node.id
+ yield NodeInfo(conn_uri, []), os_node.id
def get_free_server_grpoups(nova, template):
diff --git a/wally/storage.py b/wally/storage.py
new file mode 100644
index 0000000..5212f4a
--- /dev/null
+++ b/wally/storage.py
@@ -0,0 +1,122 @@
+"""
+This module contains interfaces for storage classes
+"""
+
+import abc
+from typing import Any, Iterable, TypeVar, Type, IO
+
+
+class IStorable(metaclass=abc.ABCMeta):
+ """Interface for type, which can be stored"""
+ @abc.abstractmethod
+ def __getstate__(self) -> Any:
+ pass
+
+ @abc.abstractmethod
+ def __setstate__(self, Any):
+ pass
+
+
+# all builtin types can be stored
+IStorable.register(list) # type: ignore
+IStorable.register(dict) # type: ignore
+IStorable.register(tuple) # type: ignore
+IStorable.register(set) # type: ignore
+IStorable.register(None) # type: ignore
+IStorable.register(int) # type: ignore
+IStorable.register(str) # type: ignore
+IStorable.register(bytes) # type: ignore
+IStorable.register(bool) # type: ignore
+
+
+ObjClass = TypeVar('ObjClass')
+
+
+class IStorage(metaclass=abc.ABCMeta):
+ """interface for storage"""
+ @abc.abstractmethod
+ def __init__(self, path: str, existing_storage: bool = False) -> None:
+ pass
+
+ @abc.abstractmethod
+ def __setitem__(self, path: str, value: IStorable) -> None:
+ pass
+
+ @abc.abstractmethod
+ def __getitem__(self, path: str) -> IStorable:
+ pass
+
+ @abc.abstractmethod
+ def __contains__(self, path: str) -> bool:
+ pass
+
+ @abc.abstractmethod
+ def list(self, path: str) -> Iterable[str]:
+ pass
+
+ @abc.abstractmethod
+ def load(self, path: str, obj_class: Type[ObjClass]) -> ObjClass:
+ pass
+
+ @abc.abstractmethod
+ def get_stream(self, path: str) -> IO:
+ pass
+
+
+class ISimpleStorage(metaclass=abc.ABCMeta):
+ """interface for low-level storage, which doesn't support serialization
+ and can operate only on bytes"""
+
+ @abc.abstractmethod
+ def __init__(self, path: str) -> None:
+ pass
+
+ @abc.abstractmethod
+ def __setitem__(self, path: str, value: bytes) -> None:
+ pass
+
+ @abc.abstractmethod
+ def __getitem__(self, path: str) -> bytes:
+ pass
+
+ @abc.abstractmethod
+ def __contains__(self, path: str) -> bool:
+ pass
+
+ @abc.abstractmethod
+ def list(self, path: str) -> Iterable[str]:
+ pass
+
+ @abc.abstractmethod
+ def get_stream(self, path: str) -> IO:
+ pass
+
+
+class ISerializer(metaclass=abc.ABCMeta):
+ """Interface for serialization class"""
+ @abc.abstractmethod
+ def pack(self, value: IStorable) -> bytes:
+ pass
+
+ @abc.abstractmethod
+ def unpack(self, data: bytes) -> IStorable:
+ pass
+
+
+# TODO(koder): this is concrete storage and serializer classes to be implemented
+class FSStorage(IStorage):
+ """Store all data in files on FS"""
+
+ @abc.abstractmethod
+ def __init__(self, root_path: str, serializer: ISerializer, existing: bool = False) -> None:
+ pass
+
+
+class YAMLSerializer(ISerializer):
+ """Serialize data to yaml"""
+ pass
+
+
+def make_storage(url: str, existing: bool = False) -> IStorage:
+ return FSStorage(url, YAMLSerializer(), existing)
+
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 50bb1fd..2360a55 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -778,7 +778,7 @@
def prepare_data(cls, results) -> List[Dict[str, Any]]:
"""create a table with io performance report for console"""
- def key_func(data) -> Tuple(str, str, str, str, int):
+ def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
tpl = data.summary_tpl()
return (data.name,
tpl.oper,
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 233f6e2..1b6ba21 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,7 +7,7 @@
import os.path
import argparse
import itertools
-from typing import Optional, Generator, Union, Dict, Iterable, Any, List, TypeVar, Callable
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple
from collections import OrderedDict, namedtuple
@@ -32,7 +32,7 @@
def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
- def required_vars(self) -> Generator[str, Var]:
+ def required_vars(self) -> Iterator[Tuple[str, Var]]:
for name, val in self.vals.items():
if isinstance(val, Var):
yield name, val
@@ -97,7 +97,7 @@
return val
-def fio_config_lexer(fio_cfg: str, fname: str) -> Generator[CfgLine]:
+def fio_config_lexer(fio_cfg: str, fname: str) -> Iterator[CfgLine]:
for lineno, oline in enumerate(fio_cfg.split("\n")):
try:
line = oline.strip()
@@ -130,7 +130,7 @@
raise ParseError(str(exc), fname, lineno, oline)
-def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Generator[FioJobSection]:
+def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobSection]:
in_globals = False
curr_section = None
glob_vals = OrderedDict()
@@ -204,7 +204,7 @@
yield curr_section
-def process_cycles(sec: FioJobSection) -> Generator[FioJobSection]:
+def process_cycles(sec: FioJobSection) -> Iterator[FioJobSection]:
cycles = OrderedDict()
for name, val in sec.vals.items():
@@ -277,7 +277,7 @@
MAGIC_OFFSET = 0.1885
-def finall_process(sec: FioJobSection, counter: Optional[List[int]] = [0]) -> FioJobSection:
+def finall_process(sec: FioJobSection, counter: List[int] = [0]) -> FioJobSection:
sec = sec.copy()
sec.vals['unified_rw_reporting'] = '1'
@@ -332,7 +332,7 @@
TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-def get_test_summary_tuple(sec: FioJobSection, vm_count=None) -> TestSumm:
+def get_test_summary_tuple(sec: FioJobSection, vm_count: int = None) -> TestSumm:
if isinstance(sec, dict):
vals = sec
else:
@@ -355,7 +355,7 @@
vm_count)
-def get_test_summary(sec: FioJobSection, vm_count: int=None, noqd: Optional[bool]=False) -> str:
+def get_test_summary(sec: FioJobSection, vm_count: int = None, noqd: bool = False) -> str:
tpl = get_test_summary_tuple(sec, vm_count)
res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
@@ -372,7 +372,7 @@
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source:str, fname: str=None) -> Generator[FioJobSection]:
+def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobSection]:
return fio_config_parse(fio_config_lexer(source, fname))
@@ -381,13 +381,13 @@
def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
- inp_iter: Iterable[FM_FUNC_INPUT]) -> Generator[FM_FUNC_RES]:
+ inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
for val in inp_iter:
for res in func(val):
yield res
-def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Generator[FioJobSection]:
+def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Iterator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 9ce3370..e937300 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,20 +1,37 @@
+from typing import List, Callable, Any, Dict, Optional
+from concurrent.futures import ThreadPoolExecutor
+
+
+from .timeseries import SensorDatastore
+from . import inode
+from .start_vms import OSCreds
+from .storage import IStorage
+from .config import Config
+
+
class TestRun:
"""Test run information"""
- def __init__(self):
+ def __init__(self, config: Config, storage: IStorage):
# NodesInfo list
- self.nodes_info = []
+ self.nodes_info = [] # type: List[inode.NodeInfo]
# Nodes list
- self.nodes = []
+ self.nodes = [] # type: List[inode.INode]
- self.build_meta = {}
- self.clear_calls_stack = []
+ self.build_meta = {} # type: Dict[str,Any]
+ self.clear_calls_stack = [] # type: List[Callable[['TestRun'], None]]
# created openstack nodes
- self.openstack_nodes_ids = []
+ self.openstack_nodes_ids = [] # type: List[str]
self.sensors_mon_q = None
# openstack credentials
- self.fuel_openstack_creds = None
+ self.fuel_openstack_creds = None # type: Optional[OSCreds]
+ self.storage = storage
+ self.config = config
+ self.sensors_data = SensorDatastore()
+
+ def get_pool(self):
+ return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
diff --git a/wally/utils.py b/wally/utils.py
index 32c9056..d2b867e 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,7 +1,8 @@
import re
import os
-import io
import sys
+import time
+import uuid
import socket
import logging
import ipaddress
@@ -10,14 +11,20 @@
import subprocess
import collections
-from .inode import INode
-from typing import Any, Tuple, Union, List, Generator, Dict, Callable, Iterable, Optional
+from .interfaces import IRemoteNode
+from typing import Any, Tuple, Union, List, Iterator, Dict, Callable, Iterable, Optional, IO, Sequence
try:
import psutil
except ImportError:
psutil = None
+try:
+ from petname import Generate as pet_generate
+except ImportError:
+ def pet_generate(x: str, y: str) -> str:
+ return str(uuid.uuid4())
+
logger = logging.getLogger("wally")
@@ -35,16 +42,16 @@
class LogError:
- def __init__(self, message: str, exc_logger=None):
+ def __init__(self, message: str, exc_logger: logging.Logger = None) -> None:
self.message = message
self.exc_logger = exc_logger
- def __enter__(self):
+ def __enter__(self) -> 'LogError':
return self
- def __exit__(self, tp: type, value: Exception, traceback: Any):
+ def __exit__(self, tp: type, value: Exception, traceback: Any) -> bool:
if value is None or isinstance(value, StopTestError):
- return
+ return False
if self.exc_logger is None:
exc_logger = sys._getframe(1).f_globals.get('logger', logger)
@@ -52,10 +59,10 @@
exc_logger = self.exc_logger
exc_logger.exception(self.message, exc_info=(tp, value, traceback))
- raise StopTestError(self.message, value)
+ raise StopTestError(self.message) from value
-def log_block(message: str, exc_logger=None) -> LogError:
+def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
logger.debug("Starts : " + message)
return LogError(message, exc_logger)
@@ -67,7 +74,7 @@
def parse_creds(creds: str) -> Tuple[str, str, str]:
- # parse user:passwd@host
+ """Parse simple credentials format user[:passwd]@host"""
user, passwd_host = creds.split(":", 1)
if '@' not in passwd_host:
@@ -78,12 +85,12 @@
return user, passwd, host
-class TaksFinished(Exception):
+class TaskFinished(Exception):
pass
class Barrier:
- def __init__(self, count: int):
+ def __init__(self, count: int) -> None:
self.count = count
self.curr_count = 0
self.cond = threading.Condition()
@@ -92,7 +99,7 @@
def wait(self, timeout: int=None) -> bool:
with self.cond:
if self.exited:
- raise TaksFinished()
+ raise TaskFinished()
self.curr_count += 1
if self.curr_count == self.count:
@@ -134,6 +141,10 @@
if size < 1024:
return str(size)
+ # make mypy happy
+ scale = 1
+ name = ""
+
for name, scale in RSMAP:
if size < 1024 * scale:
if size % scale == 0:
@@ -154,6 +165,10 @@
if size < 1000:
return str(size)
+ # make mypy happy
+ scale = 1
+ name = ""
+
for name, scale in RSMAP_10:
if size < 1000 * scale:
if size % scale == 0:
@@ -165,16 +180,21 @@
def run_locally(cmd: Union[str, List[str]], input_data: str="", timeout:int =20) -> str:
- shell = isinstance(cmd, str)
+ if isinstance(cmd, str):
+ shell = True
+ cmd_str = cmd
+ else:
+ cmd_str = " ".join(cmd)
+
proc = subprocess.Popen(cmd,
shell=shell,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- res = []
+ res = [] # type: List[Tuple[bytes, bytes]]
def thread_func() -> None:
- rr = proc.communicate(input_data)
+ rr = proc.communicate(input_data.encode("utf8"))
res.extend(rr)
thread = threading.Thread(target=thread_func,
@@ -193,12 +213,16 @@
proc.kill()
thread.join()
- raise RuntimeError("Local process timeout: " + str(cmd))
+ raise RuntimeError("Local process timeout: " + cmd_str)
- out, err = res
+ stdout_data, stderr_data = zip(*res) # type: List[bytes], List[bytes]
+
+ out = b"".join(stdout_data).decode("utf8")
+ err = b"".join(stderr_data).decode("utf8")
+
if 0 != proc.returncode:
raise subprocess.CalledProcessError(proc.returncode,
- cmd, out + err)
+ cmd_str, out + err)
return out
@@ -234,7 +258,7 @@
raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname: str) -> io.IO:
+def open_for_append_or_create(fname: str) -> IO:
if not os.path.exists(fname):
return open(fname, "w")
@@ -263,15 +287,15 @@
return data
-CLEANING = []
+CLEANING = [] # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
-def clean_resource(func: Callable[..., Any], *args, **kwargs) -> None:
- CLEANING.append((func, args, kwargs))
+def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
+ CLEANING.append((func, list(args), kwargs))
-def iter_clean_func() -> Generator[Callable[..., Any], List[Any], Dict[str, Any]]:
- while CLEANING != []:
+def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
+ while CLEANING:
yield CLEANING.pop()
@@ -285,7 +309,7 @@
return res
-def get_creds_openrc(path: str) -> Tuple[str, str, str, str, str]:
+def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
fc = open(path).read()
echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
@@ -309,7 +333,9 @@
os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
-def get_os(node: INode) -> os_release:
+def get_os(node: IRemoteNode) -> os_release:
+ """return os type, release and architecture for node.
+ """
arch = node.run("arch", nolog=True).strip()
try:
@@ -338,7 +364,7 @@
@contextlib.contextmanager
-def empty_ctx(val: Any=None) -> Generator[Any]:
+def empty_ctx(val: Any=None) -> Iterator[Any]:
yield val
@@ -347,9 +373,10 @@
os.makedirs(path)
-def log_nodes_statistic(nodes: Iterable[INode]) -> None:
+def log_nodes_statistic(nodes: Sequence[IRemoteNode]) -> None:
logger.info("Found {0} nodes total".format(len(nodes)))
- per_role = collections.defaultdict(lambda: 0)
+
+ per_role = collections.defaultdict(int) # type: Dict[str, int]
for node in nodes:
for role in node.roles:
per_role[role] += 1
@@ -369,3 +396,31 @@
return exe_file
return None
+
+
+def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
+ for i in range(max_iter):
+ run_uuid = pet_generate(2, "_")
+ results_dir = os.path.join(path, run_uuid)
+ if not os.path.exists(results_dir):
+ break
+ else:
+ run_uuid = str(uuid.uuid4())
+ results_dir = os.path.join(path, run_uuid)
+
+ return results_dir, run_uuid
+
+
+class Timeout:
+ def __init__(self, timeout: int, message: str = None) -> None:
+ self.etime = time.time() + timeout
+ self.message = message
+
+ def tick(self) -> None:
+ if time.time() > self.etime:
+ if self.message:
+ msg = "Timeout: {}".format(self.message)
+ else:
+ msg = "Timeout"
+
+ raise TimeoutError(msg)
\ No newline at end of file