Add stage base class, refactor discovery, etc
diff --git a/Makefile b/Makefile
index b14cd96..ddc1a48 100644
--- a/Makefile
+++ b/Makefile
@@ -1,8 +1,18 @@
-.PHONY: mypy
+.PHONY: mypy pylint pylint_e
ALL_FILES=$(shell find wally/ -type f -name '*.py')
STUBS="stubs:.env/lib/python3.5/site-packages"
+ACTIVATE=cd ~/workspace/wally; source .env/bin/activate
mypy:
- bash -c "cd ~/workspace/wally; source .env/bin/activate ; MYPYPATH=${STUBS} python3 -m mypy -s ${ALL_FILES}"
+ bash -c "${ACTIVATE}; MYPYPATH=${STUBS} python3 -m mypy -s ${ALL_FILES}"
+
+
+PYLINT_FMT=--msg-template={path}:{line}: [{msg_id}({symbol}), {obj}] {msg}
+
+pylint:
+ bash -c "${ACTIVATE} ; python3 -m pylint '${PYLINT_FMT}' --rcfile=pylint.rc ${ALL_FILES}"
+
+pylint_e:
+ bash -c "${ACTIVATE} ; python3 -m pylint -E '${PYLINT_FMT}' --rcfile=pylint.rc ${ALL_FILES}"
diff --git a/configs-examples/full.yaml b/configs-examples/full.yaml
index 0874f4f..5c336ff 100644
--- a/configs-examples/full.yaml
+++ b/configs-examples/full.yaml
@@ -32,7 +32,7 @@
aa_group_name: wally-aa-{0}
security_group: wally_ssh_to_everyone
-clouds:
+clouds
fuel:
url: http://172.16.44.13:8000/
creds: admin:admin@admin
diff --git a/wally/discover/__init__.py b/configs-examples/local_lxc_ceph.yaml
similarity index 100%
rename from wally/discover/__init__.py
rename to configs-examples/local_lxc_ceph.yaml
diff --git a/configs-examples/v2_default.yaml b/configs-examples/v2_default.yaml
new file mode 100644
index 0000000..931d101
--- /dev/null
+++ b/configs-examples/v2_default.yaml
@@ -0,0 +1,106 @@
+# ------------------------------------ CONFIGS -------------------------------------------------------------------
+fuel:
+ url: http://172.16.44.13:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:r00tme
+ openstack_env: test
+
+openstack:
+ skip_preparation: false
+ openrc: /home/koder/workspace/scale_openrc
+ openrc:
+ user: USER
+ passwd: PASSWD
+ tenant: KEY_FILE
+ auth_url: URL
+ SOME_OTHER_OPTS: OPTIONAL
+ vms:
+ - "USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE]"
+
+
+collect_info: true
+var_dir_root: /tmp/perf_tests
+settings_dir: ~/.wally
+
+logging:
+ extra_logs: 1
+ level: DEBUG
+
+vm_configs:
+ keypair_file_private: wally_vm_key_perf3.pem
+ keypair_file_public: wally_vm_key_perf3.pub
+ keypair_name: wally_vm_key
+
+ wally_1024:
+ image:
+ name: wally_ubuntu
+ user: ubuntu
+ url: https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+ flavor:
+ name: wally_1024
+ hdd_size: 100
+ ram_size: 1024
+ cpu_count: 2
+
+ vol_sz: 100
+ name_templ: wally-{group}-{id}
+ aa_group_name: wally-aa-{0}
+ security_group: wally_ssh_to_everyone
+
+
+ceph: nodeep-scrub, noscrub
+
+#----------------------------------------- STEPS ------------------------------------------------------------------
+# discover: ...
+# spawn: ...
+# connect: ...
+# sensors: ...
+# test: ...
+
+sensors:
+ online: true
+ roles_mapping:
+ testnode: system-cpu, block-io, net-io
+ ceph-osd: system-cpu, block-io, net-io, ceph
+ compute:
+ system-cpu: *
+ block-io: sd*
+ net-io: *
+
+#---------------------------------- TEST PROFILES --------------------------------------------------------------------
+profiles:
+ spawn:
+ OS_1_to_1:
+ openstack:
+ count: "=1"
+ cfg_name: wally_1024
+ network_zone_name: net04
+ flt_ip_pool: net04_ext
+ skip_preparation: true
+
+ test:
+ ceph_vdb:
+ - io:
+ load: ceph
+ params:
+ FILENAME: /dev/vdb
+ TEST_FILE_SIZE: AUTO
+
+ cinder_iscsi_vdb:
+ - io:
+ load: cinder_iscsi
+ params:
+ FILENAME: /dev/vdb
+ TEST_FILE_SIZE: AUTO
+
+ nova_io:
+ - io:
+ load: hdd
+ params:
+ FILENAME: /dev/vdb
+ TEST_FILE_SIZE: AUTO
+
+ openstack_ceph: OS_1_to_1 + ceph_vdb
+ openstack_cinder: OS_1_to_1 + ceph_iscsi_vdb
+ openstack_nova: OS_1_to_1 + nova_io
diff --git a/configs-examples/v2_user.yaml b/configs-examples/v2_user.yaml
new file mode 100644
index 0000000..32e5a6d
--- /dev/null
+++ b/configs-examples/v2_user.yaml
@@ -0,0 +1,24 @@
+include: v2_default.yaml
+discover: openstack,fuel_openrc_only
+run_sensors: true
+results_storage: /var/wally_results
+
+fuel:
+ url: http://FUEL_MASTER_EXTERNAL_IP:FUEL_MASTER_EXTERNAL_IP_DEFAULT_8000/
+ creds: FUEL_KS_USER:FUEL_KS_PASSWD@FUEL_KS_TENANT
+ ssh_creds: USER:PASSWD
+ openstack_env: ENV_NAME
+
+kubernetes: null
+lxd: null
+docker_swarm: null
+
+openstack:
+ OPENRC: /home/koder/workspace/scale_openrc
+ auth: USER:PASSWD:KEY_FILE
+
+openstack_reuse:
+ VM: ["ubuntu@wally-phytographic-sharla"]
+ test: ["some_testname"]
+
+test_profile: openstack_ceph
diff --git a/pylint.rc b/pylint.rc
new file mode 100644
index 0000000..16028d5
--- /dev/null
+++ b/pylint.rc
@@ -0,0 +1,407 @@
+[MASTER]
+
+# Specify a configuration file.
+#rcfile=
+
+# Python code to execute, usually for sys.path manipulation such as
+# pygtk.require().
+#init-hook=
+
+# Add files or directories to the blacklist. They should be base names, not
+# paths.
+ignore=CVS
+
+# Add files or directories matching the regex patterns to the blacklist. The
+# regex matches against base names, not paths.
+ignore-patterns=
+
+# Pickle collected data for later comparisons.
+persistent=yes
+
+# List of plugins (as comma separated values of python modules names) to load,
+# usually to register additional checkers.
+load-plugins=
+
+# Use multiple processes to speed up Pylint.
+jobs=1
+
+# Allow loading of arbitrary C extensions. Extensions are imported into the
+# active Python interpreter and may run arbitrary code.
+unsafe-load-any-extension=no
+
+# A comma-separated list of package or module names from where C extensions may
+# be loaded. Extensions are loading into the active Python interpreter and may
+# run arbitrary code
+extension-pkg-whitelist=
+
+# Allow optimization of some AST trees. This will activate a peephole AST
+# optimizer, which will apply various small optimizations. For instance, it can
+# be used to obtain the result of joining multiple strings with the addition
+# operator. Joining a lot of strings can lead to a maximum recursion error in
+# Pylint and this flag can prevent that. It has one side effect, the resulting
+# AST will be different than the one from reality. This option is deprecated
+# and it will be removed in Pylint 2.0.
+optimize-ast=no
+
+
+[MESSAGES CONTROL]
+
+# Only show warnings with the listed confidence levels. Leave empty to show
+# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED
+confidence=
+
+# Enable the message, report, category or checker with the given id(s). You can
+# either give multiple identifier separated by comma (,) or put this option
+# multiple time (only on the command line, not in the configuration file where
+# it should appear only once). See also the "--disable" option for examples.
+#enable=
+
+# Disable the message, report, category or checker with the given id(s). You
+# can either give multiple identifiers separated by comma (,) or put this
+# option multiple times (only on the command line, not in the configuration
+# file where it should appear only once).You can also use "--disable=all" to
+# disable everything first and then reenable specific checks. For example, if
+# you want to run only the similarities checker, you can use "--disable=all
+# --enable=similarities". If you want to run only the classes checker, but have
+# no Warning level messages displayed, use"--disable=all --enable=classes
+# --disable=W"
+disable=input-builtin,buffer-builtin,map-builtin-not-iterating,no-absolute-import,coerce-builtin,old-raise-syntax,delslice-method,useless-suppression,old-ne-operator,long-builtin,old-division,unicode-builtin,raw_input-builtin,unichr-builtin,oct-method,execfile-builtin,standarderror-builtin,long-suffix,reload-builtin,coerce-method,backtick,old-octal-literal,next-method-called,xrange-builtin,getslice-method,reduce-builtin,dict-iter-method,zip-builtin-not-iterating,suppressed-message,cmp-method,setslice-method,parameter-unpacking,file-builtin,filter-builtin-not-iterating,apply-builtin,dict-view-method,range-builtin-not-iterating,print-statement,metaclass-assignment,nonzero-method,intern-builtin,basestring-builtin,round-builtin,import-star-module-level,raising-string,indexing-exception,unpacking-in-except,cmp-builtin,hex-method,using-cmp-argument
+
+
+[REPORTS]
+
+# Set the output format. Available formats are text, parseable, colorized, msvs
+# (visual studio) and html. You can also give a reporter class, eg
+# mypackage.mymodule.MyReporterClass.
+output-format=text
+
+# Put messages in a separate file for each module / package specified on the
+# command line instead of printing them on stdout. Reports (if any) will be
+# written in a file name "pylint_global.[txt|html]". This option is deprecated
+# and it will be removed in Pylint 2.0.
+files-output=no
+
+# Tells whether to display a full report or only the messages
+reports=yes
+
+# Python expression which should return a note less than 10 (10 is the highest
+# note). You have access to the variables errors warning, statement which
+# respectively contain the number of errors / warnings messages and the total
+# number of statements analyzed. This is used by the global evaluation report
+# (RP0004).
+evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
+
+# Template used to display messages. This is a python new-style format string
+# used to format the message information. See doc for all details
+#msg-template=
+
+
+[BASIC]
+
+# Good variable names which should always be accepted, separated by a comma
+good-names=i,j,k,ex,Run,_
+
+# Bad variable names which should always be refused, separated by a comma
+bad-names=foo,bar,baz,toto,tutu,tata
+
+# Colon-delimited sets of names that determine each other's naming style when
+# the name regexes allow several styles.
+name-group=
+
+# Include a hint for the correct naming format with invalid-name
+include-naming-hint=no
+
+# List of decorators that produce properties, such as abc.abstractproperty. Add
+# to this list to register other decorators that produce valid properties.
+property-classes=abc.abstractproperty
+
+# Regular expression matching correct class attribute names
+class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
+
+# Naming hint for class attribute names
+class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
+
+# Regular expression matching correct inline iteration names
+inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
+
+# Naming hint for inline iteration names
+inlinevar-name-hint=[A-Za-z_][A-Za-z0-9_]*$
+
+# Regular expression matching correct attribute names
+attr-rgx=[a-z_][a-z0-9_]{2,30}$
+
+# Naming hint for attribute names
+attr-name-hint=[a-z_][a-z0-9_]{2,30}$
+
+# Regular expression matching correct class names
+class-rgx=[A-Z_][a-zA-Z0-9]+$
+
+# Naming hint for class names
+class-name-hint=[A-Z_][a-zA-Z0-9]+$
+
+# Regular expression matching correct constant names
+const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$
+
+# Naming hint for constant names
+const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__))$
+
+# Regular expression matching correct variable names
+variable-rgx=[a-z_][a-z0-9_]{2,30}$
+
+# Naming hint for variable names
+variable-name-hint=[a-z_][a-z0-9_]{2,30}$
+
+# Regular expression matching correct argument names
+argument-rgx=[a-z_][a-z0-9_]{2,30}$
+
+# Naming hint for argument names
+argument-name-hint=[a-z_][a-z0-9_]{2,30}$
+
+# Regular expression matching correct module names
+module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
+
+# Naming hint for module names
+module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
+
+# Regular expression matching correct method names
+method-rgx=[a-z_][a-z0-9_]{2,30}$
+
+# Naming hint for method names
+method-name-hint=[a-z_][a-z0-9_]{2,30}$
+
+# Regular expression matching correct function names
+function-rgx=[a-z_][a-z0-9_]{2,30}$
+
+# Naming hint for function names
+function-name-hint=[a-z_][a-z0-9_]{2,30}$
+
+# Regular expression which should only match function or class names that do
+# not require a docstring.
+no-docstring-rgx=^_
+
+# Minimum line length for functions/classes that require docstrings, shorter
+# ones are exempt.
+docstring-min-length=-1
+
+
+[ELIF]
+
+# Maximum number of nested blocks for function / method body
+max-nested-blocks=5
+
+
+[FORMAT]
+
+# Maximum number of characters on a single line.
+max-line-length=100
+
+# Regexp for a line that is allowed to be longer than the limit.
+ignore-long-lines=^\s*(# )?<?https?://\S+>?$
+
+# Allow the body of an if to be on the same line as the test if there is no
+# else.
+single-line-if-stmt=no
+
+# List of optional constructs for which whitespace checking is disabled. `dict-
+# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
+# `trailing-comma` allows a space between comma and closing bracket: (a, ).
+# `empty-line` allows space-only lines.
+no-space-check=trailing-comma,dict-separator
+
+# Maximum number of lines in a module
+max-module-lines=1000
+
+# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
+# tab).
+indent-string=' '
+
+# Number of spaces of indent required inside a hanging or continued line.
+indent-after-paren=4
+
+# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
+expected-line-ending-format=
+
+
+[LOGGING]
+
+# Logging modules to check that the string format arguments are in logging
+# function parameter format
+logging-modules=logging
+
+
+[MISCELLANEOUS]
+
+# List of note tags to take in consideration, separated by a comma.
+notes=FIXME,XXX,TODO
+
+
+[SIMILARITIES]
+
+# Minimum lines number of a similarity.
+min-similarity-lines=4
+
+# Ignore comments when computing similarities.
+ignore-comments=yes
+
+# Ignore docstrings when computing similarities.
+ignore-docstrings=yes
+
+# Ignore imports when computing similarities.
+ignore-imports=no
+
+
+[SPELLING]
+
+# Spelling dictionary name. Available dictionaries: none. To make it working
+# install python-enchant package.
+spelling-dict=
+
+# List of comma separated words that should not be checked.
+spelling-ignore-words=
+
+# A path to a file that contains private dictionary; one word per line.
+spelling-private-dict-file=
+
+# Tells whether to store unknown words to indicated private dictionary in
+# --spelling-private-dict-file option instead of raising a message.
+spelling-store-unknown-words=no
+
+
+[TYPECHECK]
+
+# Tells whether missing members accessed in mixin class should be ignored. A
+# mixin class is detected if its name ends with "mixin" (case insensitive).
+ignore-mixin-members=yes
+
+# List of module names for which member attributes should not be checked
+# (useful for modules/projects where namespaces are manipulated during runtime
+# and thus existing member attributes cannot be deduced by static analysis. It
+# supports qualified module names, as well as Unix pattern matching.
+ignored-modules=
+
+# List of class names for which member attributes should not be checked (useful
+# for classes with dynamically set attributes). This supports the use of
+# qualified names.
+ignored-classes=optparse.Values,thread._local,_thread._local
+
+# List of members which are set dynamically and missed by pylint inference
+# system, and so shouldn't trigger E1101 when accessed. Python regular
+# expressions are accepted.
+generated-members=
+
+# List of decorators that produce context managers, such as
+# contextlib.contextmanager. Add to this list to register other decorators that
+# produce valid context managers.
+contextmanager-decorators=contextlib.contextmanager
+
+
+[VARIABLES]
+
+# Tells whether we should check for unused import in __init__ files.
+init-import=no
+
+# A regular expression matching the name of dummy variables (i.e. expectedly
+# not used).
+dummy-variables-rgx=(_+[a-zA-Z0-9]*?$)|dummy
+
+# List of additional names supposed to be defined in builtins. Remember that
+# you should avoid to define new builtins when possible.
+additional-builtins=
+
+# List of strings which can identify a callback function by name. A callback
+# name must start or end with one of those strings.
+callbacks=cb_,_cb
+
+# List of qualified module names which can have objects that can redefine
+# builtins.
+redefining-builtins-modules=six.moves,future.builtins
+
+
+[CLASSES]
+
+# List of method names used to declare (i.e. assign) instance attributes.
+defining-attr-methods=__init__,__new__,setUp
+
+# List of valid names for the first argument in a class method.
+valid-classmethod-first-arg=cls
+
+# List of valid names for the first argument in a metaclass class method.
+valid-metaclass-classmethod-first-arg=mcs
+
+# List of member names, which should be excluded from the protected access
+# warning.
+exclude-protected=_asdict,_fields,_replace,_source,_make
+
+
+[DESIGN]
+
+# Maximum number of arguments for function / method
+max-args=5
+
+# Argument names that match this expression will be ignored. Default to name
+# with leading underscore
+ignored-argument-names=_.*
+
+# Maximum number of locals for function / method body
+max-locals=15
+
+# Maximum number of return / yield for function / method body
+max-returns=6
+
+# Maximum number of branch for function / method body
+max-branches=12
+
+# Maximum number of statements in function / method body
+max-statements=50
+
+# Maximum number of parents for a class (see R0901).
+max-parents=7
+
+# Maximum number of attributes for a class (see R0902).
+max-attributes=7
+
+# Minimum number of public methods for a class (see R0903).
+min-public-methods=2
+
+# Maximum number of public methods for a class (see R0904).
+max-public-methods=20
+
+# Maximum number of boolean expressions in a if statement
+max-bool-expr=5
+
+
+[IMPORTS]
+
+# Deprecated modules which should not be used, separated by a comma
+deprecated-modules=optparse
+
+# Create a graph of every (i.e. internal and external) dependencies in the
+# given file (report RP0402 must not be disabled)
+import-graph=
+
+# Create a graph of external dependencies in the given file (report RP0402 must
+# not be disabled)
+ext-import-graph=
+
+# Create a graph of internal dependencies in the given file (report RP0402 must
+# not be disabled)
+int-import-graph=
+
+# Force import order to recognize a module as part of the standard
+# compatibility libraries.
+known-standard-library=
+
+# Force import order to recognize a module as part of a third party library.
+known-third-party=enchant
+
+# Analyse import fallback blocks. This can be used to support both Python 2 and
+# 3 compatible code, which means that the block might have code that exists
+# only in one or another interpreter, leading to false positives when analysed.
+analyse-fallback-blocks=no
+
+
+[EXCEPTIONS]
+
+# Exceptions that will emit a warning when being caught. Defaults to
+# "Exception"
+overgeneral-exceptions=Exception
diff --git a/stubs/yaml.pyi b/stubs/yaml.pyi
index d8d6516..4f231f5 100644
--- a/stubs/yaml.pyi
+++ b/stubs/yaml.pyi
@@ -4,6 +4,11 @@
Basic = Union[List, Dict[str, Any]]
-def load(stream: IO, loader: Any) -> Any: ...
+class Loader: ...
+class Dumper: ...
class CLoader: ...
+class CDumper: ...
+# def load(stream: IO, loader: Any) -> Any: ...
+def load(data: bytes, Loader: Any = None, encoding: str = 'utf8') -> Any: ...
+def dump(data: str, Dumper: Any = None, encoding: str = 'utf8') -> bytes: ...
diff --git a/tests/test_agent.py b/tests/test_fio_parser.py
similarity index 83%
rename from tests/test_agent.py
rename to tests/test_fio_parser.py
index f9fa389..b496190 100644
--- a/tests/test_agent.py
+++ b/tests/test_fio_parser.py
@@ -5,7 +5,7 @@
from oktest import ok, main, test
-from wally.suits.io import agent
+from wally.suits.io import fio_task_parser
code_test_defaults = """
[defaults]
@@ -94,21 +94,18 @@
"""
-P = agent.parse_all_in_1
-
-
class AgentTest(unittest.TestCase):
@test("test_parse_value")
def test_parse_value(self):
x = "asdfasd adsd d"
- ok(agent.parse_value(x)) == x
- ok(agent.parse_value("10 2")) == "10 2"
- ok(agent.parse_value(None)).is_(None)
- ok(agent.parse_value("10")) == 10
- ok(agent.parse_value("20")) == 20
- ok(agent.parse_value("10.1") - 10.1) < 1E-7
- ok(agent.parse_value("{% 10, 20 %}")) == [10, 20]
- ok(agent.parse_value("{% 10,20 %}")) == [10, 20]
+ ok(fio_task_parser.parse_value(x)) == x
+ ok(fio_task_parser.parse_value("10 2")) == "10 2"
+ ok(fio_task_parser.parse_value("None")).is_(None)
+ ok(fio_task_parser.parse_value("10")) == 10
+ ok(fio_task_parser.parse_value("20")) == 20
+ ok(fio_task_parser.parse_value("10.1") - 10.1) < 1E-7
+ ok(fio_task_parser.parse_value("{% 10, 20 %}")) == [10, 20]
+ ok(fio_task_parser.parse_value("{% 10,20 %}")) == [10, 20]
code_test_compile_simplest = defaults + """
[sec1]
@@ -117,7 +114,7 @@
@test("test_compile_simplest")
def test_compile_simplest(self):
- sections = P(self.code_test_compile_simplest, {})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_compile_simplest, {})
sections = list(sections)
ok(len(sections)) == 1
@@ -138,7 +135,7 @@
@test("test_compile_defaults")
def test_compile_defaults(self):
- sections = P(self.code_test_params_in_defaults, {})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_params_in_defaults, {})
sections = list(sections)
ok(len(sections)) == 1
@@ -151,7 +148,7 @@
@test("test_defaults")
def test_defaults(self):
- sections = P(code_test_defaults, {})
+ sections = fio_task_parser.parse_all_in_1(code_test_defaults, {})
sections = list(sections)
ok(len(sections)) == 2
@@ -177,11 +174,10 @@
@test("test_external_params")
def test_external_params(self):
with self.assertRaises(KeyError):
- sections = P(self.code_test_ext_params, {})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_ext_params, {})
list(sections)
- sections = P(self.code_test_ext_params,
- {'RUNTIME': 20})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_ext_params, {'RUNTIME': 20})
sections = list(sections)
code_test_cycle = defaults + """
@@ -192,8 +188,7 @@
@test("test_cycle")
def test_cycle(self):
- sections = P(self.code_test_cycle,
- {'RUNTIME': 20})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_cycle, {'RUNTIME': 20})
sections = list(sections)
ok(len(sections)) == 2
ok(sections[0].vals['ramp_time']) == 20
@@ -208,8 +203,7 @@
@test("test_cycles")
def test_cycles(self):
- sections = P(self.code_test_cycles,
- {'RUNTIME': 20})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_cycles, {'RUNTIME': 20})
sections = list(sections)
ok(len(sections)) == 4
@@ -224,13 +218,12 @@
@test("test_time_estimate")
def test_time_estimate(self):
- sections = P(self.code_test_cycles,
- {'RUNTIME': 20})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_cycles, {'RUNTIME': 20})
sections = list(sections)
- etime = agent.calculate_execution_time(sections)
+ etime = fio_task_parser.calculate_execution_time(sections)
ok(etime) == 20 * 4 + 20 * 2 + 40 * 2
- ok(agent.sec_to_str(etime)) == "0:03:20"
+ ok(fio_task_parser.sec_to_str(etime)) == "0:03:20"
code_test_cycles2 = defaults + """
[sec1 * 7]
@@ -241,14 +234,13 @@
@test("test_time_estimate")
def test_time_estimate_large(self):
- sections = P(self.code_test_cycles2,
- {'RUNTIME': 30})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_cycles2, {'RUNTIME': 30})
sections = list(sections)
ok(sections[0].name) == 'sec1'
ok(len(sections)) == 7 * 4
- etime = agent.calculate_execution_time(sections)
+ etime = fio_task_parser.calculate_execution_time(sections)
# ramptime optimization
expected_time = (20 + 30 + 30 * 6) * 2
expected_time += (40 + 30 + 30 * 6) * 2
@@ -268,14 +260,14 @@
@test("test_time_estimate2")
def test_time_estimate_large2(self):
- sections = P(self.code_test_cycles3, {'RUNTIME': 30})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_cycles3, {'RUNTIME': 30})
sections = list(sections)
ok(sections[0].name) == 'sec1'
ok(sections[1].name) == 'sec1'
ok(len(sections)) == 7 * 4 * 2
- etime = agent.calculate_execution_time(sections)
+ etime = fio_task_parser.calculate_execution_time(sections)
# ramptime optimization
expected_time = (20 + 30 + 30 * 6) * 2
expected_time += (40 + 30 + 30 * 6) * 2
@@ -290,7 +282,7 @@
@test("test_repeat")
def test_repeat(self):
- sections = P(self.code_test_repeats, {})
+ sections = fio_task_parser.parse_all_in_1(self.code_test_repeats, {})
sections = list(sections)
ok(len(sections)) == 2 + 3
ok(sections[0].name) == 'sec1'
@@ -301,7 +293,7 @@
@test("test_real_tasks")
def test_real_tasks(self):
- tasks_dir = os.path.dirname(agent.__file__)
+ tasks_dir = os.path.dirname(fio_task_parser.__file__)
fname = os.path.join(tasks_dir, 'io_scenario_ceph.cfg')
fc = open(fname).read()
@@ -310,7 +302,7 @@
ok(len(sections)) == 7 * 9 * 4 + 7
- etime = agent.calculate_execution_time(sections)
+ etime = fio_task_parser.calculate_execution_time(sections)
# ramptime optimization
expected_time = (60 * 7 + 30) * 9 * 4 + (60 * 7 + 30)
ok(etime) == expected_time
diff --git a/tests/test_ssh.py b/tests/test_ssh.py
new file mode 100644
index 0000000..efc5f09
--- /dev/null
+++ b/tests/test_ssh.py
@@ -0,0 +1,33 @@
+import getpass
+
+from oktest import ok
+
+from wally import ssh_utils, ssh
+
+
+creds = "root@osd-0"
+
+
+def test_ssh_url_parser():
+ curr_user = getpass.getuser()
+ creds = {
+ "test": ssh_utils.ConnCreds("test", curr_user, port=23),
+ "test:13": ssh_utils.ConnCreds("test", curr_user, port=13),
+ "test::xxx.key": ssh_utils.ConnCreds("test", curr_user, port=23, key_file="xxx.key"),
+ "test:123:xxx.key": ssh_utils.ConnCreds("test", curr_user, port=123, key_file="xxx.key"),
+ "user@test": ssh_utils.ConnCreds("test", "user", port=23),
+ "user@test:13": ssh_utils.ConnCreds("test", "user", port=13),
+ "user@test::xxx.key": ssh_utils.ConnCreds("test", "user", port=23, key_file="xxx.key"),
+ "user@test:123:xxx.key": ssh_utils.ConnCreds("test", "user", port=123, key_file="xxx.key"),
+ "user:passwd:@test": ssh_utils.ConnCreds("test", curr_user, port=23, passwd="passwd:"),
+ "user:passwd:@test:123": ssh_utils.ConnCreds("test", curr_user, port=123, passwd="passwd:"),
+ }
+
+ for uri, expected in creds.items():
+ parsed = ssh_utils.parse_ssh_uri(uri)
+ ok(parsed.user) == expected.user
+ ok(parsed.addr.port) == expected.addr.port
+ ok(parsed.addr.host) == expected.addr.host
+ ok(parsed.key_file) == expected.key_file
+ ok(parsed.passwd) == expected.passwd
+
diff --git a/tests/test_storage.py b/tests/test_storage.py
index 46f38e6..8fd8f4d 100644
--- a/tests/test_storage.py
+++ b/tests/test_storage.py
@@ -1,9 +1,11 @@
+import array
import shutil
import tempfile
import contextlib
-from oktest import ok, main, test
+import pytest
+from oktest import ok
from wally.storage import make_storage
@@ -35,19 +37,162 @@
for path, val in values.items():
storage[path] = val
+ with make_storage(root, existing=True) as storage:
+ for path, val in values.items():
+ ok(storage[path]) == val
+ ok(storage.get(path)) == val
+
+
+def test_path_list():
+ values = {
+ "int": 1,
+ "str/1": "test",
+ "bytes/2": b"test",
+ "none/s/1": None,
+ "bool/xx/1/2/1": None,
+ "float/s/1": 1.234,
+ "list": [1, 2, "3"],
+ "dict": {1: 3, "2": "4", "1.2": 1.3}
+ }
+
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ for path, val in values.items():
+ storage[path.split('/')] = val
with make_storage(root, existing=True) as storage:
for path, val in values.items():
- ok(storage[path]) == val
+ ok(storage[path.split('/')]) == val
+ ok(storage.get(path.split('/'))) == val
+
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ for path, val in values.items():
+ storage[path] = val
+
+ with make_storage(root, existing=True) as storage:
+ for path, val in values.items():
+ ok(storage[path.split('/')]) == val
+ ok(storage.get(path.split('/'))) == val
-def test_large_arrays():
- pass
+def test_list():
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ storage["x/some_path1"] = "1"
+ storage["x/some_path2"] = [1, 2, 3]
+ storage["x/some_path3"] = [1, 2, 3, 4]
+
+ storage["x/y/some_path11"] = "1"
+ storage["x/y/some_path22"] = [1, 2, 3]
+
+ with make_storage(root, existing=True) as storage:
+ assert 'x' in storage
+ assert 'x/y' in storage
+
+ assert {(False, 'x')} == set(storage.list())
+
+ assert {(True, 'some_path1'),
+ (True, 'some_path2'),
+ (True, 'some_path3'),
+ (False, "y")} == set(storage.list("x"))
+
+ assert {(True, 'some_path11'), (True, 'some_path22')} == set(storage.list("x/y"))
-def test_array_append():
- pass
+def test_overwrite():
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ storage["some_path"] = "1"
+ storage["some_path"] = [1, 2, 3]
+
+ with make_storage(root, existing=True) as storage:
+ assert storage["some_path"] == [1, 2, 3]
-def test_performance():
- pass
+def test_multy_level():
+ with in_temp_dir() as root:
+ values = {
+ "dict1": {1: {3: 4, 6: [12, {123, 3}, {4: 3}]}, "2": "4", "1.2": 1.3}
+ }
+
+ with make_storage(root, existing=False) as storage:
+ for path, val in values.items():
+ storage[path] = val
+
+ with make_storage(root, existing=True) as storage:
+ for path, val in values.items():
+ ok(storage[path]) == val
+
+
+def test_arrays():
+ with in_temp_dir() as root:
+ val_l = list(range(10000)) * 10
+ val_i = array.array("i", val_l)
+ val_f = array.array("f", map(float, val_l))
+ val_2f = val_f + val_f
+
+ with make_storage(root, existing=False) as storage:
+ storage.set_array(val_i, "array_i")
+ storage.set_array(val_f, "array_f")
+ storage.set_array(val_f, "array_x2")
+ storage.append(val_f, "array_x2")
+
+ with make_storage(root, existing=True) as storage:
+ ok(val_i) == storage.get_array("i", "array_i")
+ ok(val_f) == storage.get_array("f", "array_f")
+ ok(val_2f) == storage.get_array("f", "array_x2")
+
+
+class LoadMe:
+ def __init__(self, **vals):
+ self.__dict__.update(vals)
+
+
+def test_load_user_obj():
+ obj = LoadMe(x=1, y=12, z=[1,2,3], t="asdad", gg={"a": 1, "g": [["x"]]})
+
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ storage["obj"] = obj
+
+ with make_storage(root, existing=True) as storage:
+ obj2 = storage.load(LoadMe, "obj")
+ assert isinstance(obj2, LoadMe)
+ ok(obj2.__dict__) == obj.__dict__
+
+
+def test_path_not_exists():
+ with in_temp_dir() as root:
+ pass
+
+ with pytest.raises(IOError):
+ with make_storage(root, existing=True) as storage:
+ pass
+
+ with in_temp_dir() as root:
+ pass
+
+ with make_storage(root, existing=False) as storage:
+ with pytest.raises(IOError):
+ storage["x"]
+
+
+def test_incorrect_user_object():
+ obj = LoadMe(x=1, y=LoadMe(t=12))
+
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ with pytest.raises(ValueError):
+ storage["obj"] = obj
+
+
+def test_substorage():
+ with in_temp_dir() as root:
+ with make_storage(root, existing=False) as storage:
+ storage["x/y"] = "data"
+ storage.sub_storage("t")["r"] = "sub_data"
+
+ with make_storage(root, existing=True) as storage:
+ ok(storage["t/r"]) == "sub_data"
+ ok(storage.sub_storage("x")["y"]) == "data"
diff --git a/v2_plans.md b/v2_plans.md
index 95ffc92..edce540 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -10,16 +10,23 @@
* Simplify settings
* Unit-tests
* 'perf' sensor
- * ftrace, https://github.com/iovisor/bcc, etc
- * Config validation
+ * ftrace, [bcc](https://github.com/iovisor/bcc), etc
+ * Config revised:
+ * Full config is a set of independent sections, each related to one plugin or 'step'
+ * Simple user config get compiled into "full" config with variable substitution
+ * Result config then validated
+ * Each plugin defines config sections tructure and validation
* Add sync 4k write with small set of thcount
+ * White-box event logs for UT
+ * Result-to-yaml for UT
* Infra:
* Add script to download fio from git and build it
* Docker/lxd public container as default distribution way
+ * Update setup.py to provide CLI entry points
* Statistical result check and report:
- * Comprehensive report with results histograms and other
+ * Comprehensive report with results histograms and other, [Q-Q plot](https://en.wikipedia.org/wiki/Q%E2%80%93Q_plot)
* Check results distribution
* Warn for non-normal results
* Check that distribution of different parts is close. Average
@@ -31,9 +38,9 @@
depending on selected visualization type
* Offload simple report table to cvs/yaml/json/test/ascii_table
* fio load reporters (visualizers), ceph report tool
- https://github.com/cronburg/ceph-viz/tree/master/histogram
+ [ceph-viz-histo](https://github.com/cronburg/ceph-viz/tree/master/histogram)
* evaluate bokeh for visualization
- * flamegraph for 'perf' output - https://www.youtube.com/watch?v=nZfNehCzGdw
+ * [flamegraph](https://www.youtube.com/watch?v=nZfNehCzGdw) for 'perf' output
* detect internal pattern:
- FFT
- http://mabrek.github.io/
@@ -50,7 +57,23 @@
- http://www.lognormal.com/features/
- http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
-* Intelectual postprocessing:
+* Report structure
+ * Overall report
+ * Extended engineering report
+ * Cluster information
+ * Loads. For each load:
+ - IOPS distribution, stat analisys
+ - LAT heatmap/histo, stat analisys
+ - Bottleneck analisys
+ * Changes for load groups - show how IOPS/LAT histo is chages with thread count
+ * Report help page, link for explanations
+
+* Report pictures:
+ * checkboxes for show/hide part of image
+ * pop-up help for part of picture
+ * pop-up text values for bars/lines
+
+* Intellectual postprocessing:
* Difference calculation
* Resource usage calculator/visualizer, bottleneck hunter
* correct comparison between different systems
@@ -62,3 +85,4 @@
* Add aio rpc client
* Add integration tests with nbd
* fix existing folder detection
+ * Simple REST API for external in-browser UI
\ No newline at end of file
diff --git a/wally/ceph.py b/wally/ceph.py
new file mode 100644
index 0000000..e23343e
--- /dev/null
+++ b/wally/ceph.py
@@ -0,0 +1,112 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+from typing import Set, Dict, cast
+
+
+from .node_interfaces import NodeInfo, IRPCNode
+from .ssh_utils import ConnCreds
+from .common_types import IP
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .ssh_utils import parse_ssh_uri
+from .node import connect, setup_rpc
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+ """Get set of osd's ip"""
+
+ data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
+ jdata = json.loads(data)
+ ips = set() # type: Set[IP]
+ first_error = True
+ for osd_data in jdata["osds"]:
+ if "public_addr" not in osd_data:
+ if first_error:
+ osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
+ logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
+ "(all subsequent errors omitted)", osd_id)
+ first_error = False
+ else:
+ ip_port = osd_data["public_addr"]
+ if '/' in ip_port:
+ ip_port = ip_port.split("/", 1)[0]
+ ips.add(IP(ip_port.split(":")[0]))
+ return ips
+
+
+def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+ """Return mon ip set"""
+
+ data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
+ jdata = json.loads(data)
+ ips = set() # type: Set[IP]
+ first_error = True
+ for mon_data in jdata["monmap"]["mons"]:
+ if "addr" not in mon_data:
+ if first_error:
+ mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
+ logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
+ "(all subsequent errors omitted)", mon_name)
+ first_error = False
+ else:
+ ip_port = mon_data["addr"]
+ if '/' in ip_port:
+ ip_port = ip_port.split("/", 1)[0]
+ ips.add(IP(ip_port.split(":")[0]))
+
+ return ips
+
+
+class DiscoverCephStage(Stage):
+ config_block = 'ceph'
+ priority = StepOrder.DISCOVER
+
+ def run(self, ctx: TestRun) -> None:
+ """Return list of ceph's nodes NodeInfo"""
+
+ if 'ceph_nodes' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'ceph_nodes'))
+ else:
+ ceph = ctx.config.ceph
+ root_node_uri = cast(str, ceph.root_node)
+ cluster = ceph.get("cluster", "ceph")
+ conf = ceph.get("conf")
+ key = ceph.get("key")
+ info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+ ceph_nodes = {} # type: Dict[IP, NodeInfo]
+
+ if conf is None:
+ conf = "/etc/ceph/{}.conf".format(cluster)
+
+ if key is None:
+ key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+
+ with setup_rpc(connect(info), ctx.rpc_code) as node:
+
+ # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
+ ssh_key = node.get_file_content("~/.ssh/id_rsa")
+
+ try:
+ for ip in get_osds_ips(node, conf, key):
+ if ip in ceph_nodes:
+ ceph_nodes[ip].roles.add("ceph-osd")
+ else:
+ ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"})
+ except Exception as exc:
+ logger.error("OSD discovery failed: %s", exc)
+
+ try:
+ for ip in get_mons_ips(node, conf, key):
+ if ip in ceph_nodes:
+ ceph_nodes[ip].roles.add("ceph-mon")
+ else:
+ ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"})
+ except Exception as exc:
+ logger.error("MON discovery failed: %s", exc)
+
+ ctx.nodes_info.extend(ceph_nodes.values())
+ ctx.storage['ceph-nodes'] = list(ceph_nodes.values())
diff --git a/wally/config.py b/wally/config.py
index cac4acf..e96a03e 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -10,13 +10,14 @@
storage_url = None # type: str
comment = None # type: str
keep_vm = None # type: bool
- no_tests = None # type: bool
dont_discover_nodes = None # type: bool
build_id = None # type: str
build_description = None # type: str
build_type = None # type: str
default_test_local_folder = None # type: str
settings_dir = None # type: str
+ connect_timeout = 30 # type: int
+ no_tests = False # type: bool
def __init__(self, dct: ConfigBlock) -> None:
self.__dict__['_dct'] = dct
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
deleted file mode 100644
index 4a72bfb..0000000
--- a/wally/discover/ceph.py
+++ /dev/null
@@ -1,91 +0,0 @@
-""" Collect data about ceph nodes"""
-import json
-import logging
-from typing import List, Set, Dict
-
-
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..common_types import IP
-
-logger = logging.getLogger("wally.discover")
-
-
-def discover_ceph_nodes(node: IRPCNode,
- cluster: str = "ceph",
- conf: str = None,
- key: str = None) -> List[NodeInfo]:
- """Return list of ceph's nodes NodeInfo"""
-
- if conf is None:
- conf = "/etc/ceph/{}.conf".format(cluster)
-
- if key is None:
- key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
-
- try:
- osd_ips = get_osds_ips(node, conf, key)
- except Exception as exc:
- logger.error("OSD discovery failed: %s", exc)
- osd_ips = set()
-
- try:
- mon_ips = get_mons_ips(node, conf, key)
- except Exception as exc:
- logger.error("MON discovery failed: %s", exc)
- mon_ips = set()
-
- ips = {} # type: Dict[str, List[str]]
- for ip in osd_ips:
- ips.setdefault(ip, []).append("ceph-osd")
-
- for ip in mon_ips:
- ips.setdefault(ip, []).append("ceph-mon")
-
- ssh_key = node.get_file_content("~/.ssh/id_rsa")
- return [NodeInfo(ConnCreds(host=ip, user="root", key=ssh_key), set(roles)) for ip, roles in ips.items()]
-
-
-def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
- """Get set of osd's ip"""
-
- data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
- jdata = json.loads(data)
- ips = set() # type: Set[IP]
- first_error = True
- for osd_data in jdata["osds"]:
- if "public_addr" not in osd_data:
- if first_error:
- osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
- logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
- "(all subsequent errors omitted)", osd_id)
- first_error = False
- else:
- ip_port = osd_data["public_addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
- return ips
-
-
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
- """Return mon ip set"""
-
- data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
- jdata = json.loads(data)
- ips = set() # type: Set[IP]
- first_error = True
- for mon_data in jdata["monmap"]["mons"]:
- if "addr" not in mon_data:
- if first_error:
- mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
- logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
- "(all subsequent errors omitted)", mon_name)
- first_error = False
- else:
- ip_port = mon_data["addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
-
- return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
deleted file mode 100644
index 35ddc62..0000000
--- a/wally/discover/discover.py
+++ /dev/null
@@ -1,108 +0,0 @@
-import os.path
-import logging
-from typing import Dict, NamedTuple, List, Optional, cast
-
-from paramiko.ssh_exception import AuthenticationException
-
-from . import ceph
-from . import fuel
-from . import openstack
-from ..utils import parse_creds, StopTestError
-from ..config import ConfigBlock
-from ..start_vms import OSCreds
-from ..node_interfaces import NodeInfo
-from ..node import connect, setup_rpc
-from ..ssh_utils import parse_ssh_uri
-from ..test_run_class import TestRun
-
-
-logger = logging.getLogger("wally.discover")
-
-
-openrc_templ = """#!/bin/sh
-export LC_ALL=C
-export OS_NO_CACHE='true'
-export OS_TENANT_NAME='{tenant}'
-export OS_USERNAME='{name}'
-export OS_PASSWORD='{passwd}'
-export OS_AUTH_URL='{auth_url}'
-export OS_INSECURE={insecure}
-export OS_AUTH_STRATEGY='keystone'
-export OS_REGION_NAME='RegionOne'
-export CINDER_ENDPOINT_TYPE='publicURL'
-export GLANCE_ENDPOINT_TYPE='publicURL'
-export KEYSTONE_ENDPOINT_TYPE='publicURL'
-export NOVA_ENDPOINT_TYPE='publicURL'
-export NEUTRON_ENDPOINT_TYPE='publicURL'
-"""
-
-
-DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
-
-
-def discover(ctx: TestRun,
- discover_list: List[str],
- clusters_info: ConfigBlock,
- discover_nodes: bool = True) -> DiscoveryResult:
- """Discover nodes in clusters"""
-
- new_nodes = [] # type: List[NodeInfo]
- os_creds = None # type: Optional[OSCreds]
-
- for cluster in discover_list:
- if cluster == "openstack":
- if not discover_nodes:
- logger.warning("Skip openstack cluster discovery")
- continue
-
- cluster_info = clusters_info["openstack"] # type: ConfigBlock
- new_nodes.extend(openstack.discover_openstack_nodes(ctx.os_connection, cluster_info))
-
- elif cluster == "fuel" or cluster == "fuel_openrc_only":
- if cluster == "fuel_openrc_only":
- discover_nodes = False
-
- fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
- try:
- fuel_rpc_conn = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
- except AuthenticationException:
- raise StopTestError("Wrong fuel credentials")
- except Exception:
- logger.exception("While connection to FUEL")
- raise StopTestError("Failed to connect to FUEL")
-
- # TODO(koder): keep FUEL rpc in context? Maybe open this connection on upper stack level?
- with fuel_rpc_conn:
- nodes, fuel_info = fuel.discover_fuel_nodes(
- fuel_rpc_conn, ctx.fuel_conn, clusters_info['fuel'], discover_nodes)
- new_nodes.extend(nodes)
-
- if fuel_info.openrc:
- auth_url = cast(str, fuel_info.openrc['os_auth_url'])
- if fuel_info.version >= [8, 0] and auth_url.startswith("https://"):
- logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
- auth_url = auth_url.replace("https", "http", 1)
-
- os_creds = OSCreds(name=cast(str, fuel_info.openrc['username']),
- passwd=cast(str, fuel_info.openrc['password']),
- tenant=cast(str, fuel_info.openrc['tenant_name']),
- auth_url=cast(str, auth_url),
- insecure=cast(bool, fuel_info.openrc['insecure']))
-
- elif cluster == "ceph":
- if discover_nodes:
- cluster_info = clusters_info["ceph"]
- root_node_uri = cast(str, cluster_info["root_node"])
- cluster = clusters_info["ceph"].get("cluster", "ceph")
- conf = clusters_info["ceph"].get("conf")
- key = clusters_info["ceph"].get("key")
- info = NodeInfo(parse_ssh_uri(root_node_uri), set())
- with setup_rpc(connect(info), ctx.rpc_code) as ceph_root_conn:
- new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
- else:
- logger.warning("Skip ceph cluster discovery")
- else:
- msg_templ = "Unknown cluster type in 'discover' parameter: {!r}"
- raise ValueError(msg_templ.format(cluster))
-
- return DiscoveryResult(os_creds, new_nodes)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
deleted file mode 100644
index 7b4f90f..0000000
--- a/wally/discover/fuel.py
+++ /dev/null
@@ -1,58 +0,0 @@
-import logging
-import socket
-from typing import Dict, Any, Tuple, List, NamedTuple, Union, cast
-from urllib.parse import urlparse
-
-from .. import fuel_rest_api
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..utils import check_input_param
-
-logger = logging.getLogger("wally.discover")
-
-
-FuelNodeInfo = NamedTuple("FuelNodeInfo",
- [("version", List[int]),
- ("fuel_ext_iface", str),
- ("openrc", Dict[str, Union[str, bool]])])
-
-
-def discover_fuel_nodes(fuel_master_node: IRPCNode,
- fuel_conn: fuel_rest_api.Connection,
- fuel_data: Dict[str, Any],
- discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
- """Discover nodes in fuel cluster, get openrc for selected cluster"""
-
- msg = "openstack_env should be provided in fuel config"
- check_input_param('openstack_env' in fuel_data, msg)
-
- # get cluster information from REST API
- cluster_id = fuel_rest_api.get_cluster_id(fuel_conn, fuel_data['openstack_env'])
- cluster = fuel_rest_api.reflect_cluster(fuel_conn, cluster_id)
- version = fuel_rest_api.FuelInfo(fuel_conn).get_version()
-
- if not discover_nodes:
- logger.warning("Skip fuel cluster discovery")
- return [], FuelNodeInfo(version, None, cluster.get_openrc()) # type: ignore
-
- logger.info("Found fuel {0}".format(".".join(map(str, version))))
-
- # get FUEL master key to connect to cluster nodes via ssh
- logger.debug("Downloading fuel master key")
- fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
-
- network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
- fuel_ip = socket.gethostbyname(fuel_conn.host)
- fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
-
- nodes = []
- for fuel_node in list(cluster.get_nodes()):
- ip = str(fuel_node.get_ip(network))
- creds = ConnCreds(ip, "root", key=fuel_key)
- nodes.append(NodeInfo(creds, roles=set(fuel_node.get_roles())))
-
- logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
-
- return nodes, FuelNodeInfo(version, fuel_ext_iface,
- cast(Dict[str, Union[str, bool]], cluster.get_openrc()))
-
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
deleted file mode 100644
index f590359..0000000
--- a/wally/discover/openstack.py
+++ /dev/null
@@ -1,66 +0,0 @@
-import socket
-import logging
-from typing import Dict, Any, List, Optional, cast
-
-from ..node_interfaces import NodeInfo
-from ..config import ConfigBlock
-from ..ssh_utils import ConnCreds
-from ..start_vms import OSConnection, NovaClient
-
-
-logger = logging.getLogger("wally.discover")
-
-
-def get_floating_ip(vm: Any) -> str:
- """Get VM floating IP address"""
-
- for net_name, ifaces in vm.addresses.items():
- for iface in ifaces:
- if iface.get('OS-EXT-IPS:type') == "floating":
- return iface['addr']
-
- raise ValueError("VM {} has no floating ip".format(vm))
-
-
-def discover_vms(client: NovaClient, search_data: str) -> List[NodeInfo]:
- """Discover virtual machines"""
- name, user, key_file = search_data.split(",")
- servers = client.servers.list(search_opts={"name": name})
- logger.debug("Found %s openstack vms" % len(servers))
-
- nodes = [] # type: List[NodeInfo]
- for server in servers:
- ip = get_floating_ip(server)
- creds = ConnCreds(host=ip, user=user, key_file=key_file)
- nodes.append(NodeInfo(creds, roles={"test_vm"}))
-
- return nodes
-
-
-def discover_openstack_nodes(conn: OSConnection, conf: ConfigBlock) -> List[NodeInfo]:
- """Discover openstack services for given cluster"""
- os_nodes_auth = conf['auth'] # type: str
-
- if os_nodes_auth.count(":") == 2:
- user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
- if not password:
- password = None
- else:
- user, password = os_nodes_auth.split(":")
- key_file = None
-
- services = conn.nova.services.list() # type: List[Any]
- host_services_mapping = {} # type: Dict[str, List[str]]
-
- for service in services:
- ip = cast(str, socket.gethostbyname(service.host))
- host_services_mapping.get(ip, []).append(service.binary)
-
- logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
-
- nodes = [] # type: List[NodeInfo]
- for host, services in host_services_mapping.items():
- creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
- nodes.append(NodeInfo(creds, set(services)))
-
- return nodes
diff --git a/wally/fuel.py b/wally/fuel.py
new file mode 100644
index 0000000..040dcf4
--- /dev/null
+++ b/wally/fuel.py
@@ -0,0 +1,105 @@
+import logging
+from typing import Dict, List, NamedTuple, Union, cast
+
+from paramiko.ssh_exception import AuthenticationException
+
+from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
+from .node_interfaces import NodeInfo
+from .ssh_utils import ConnCreds, parse_ssh_uri
+from .utils import check_input_param, StopTestError, parse_creds
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .node import connect, setup_rpc
+from .config import ConfigBlock
+from .openstack_api import OSCreds
+
+
+logger = logging.getLogger("wally.discover")
+
+
+FuelNodeInfo = NamedTuple("FuelNodeInfo",
+ [("version", List[int]),
+ ("fuel_ext_iface", str),
+ ("openrc", Dict[str, Union[str, bool]])])
+
+
+
+class DiscoverFuelStage(Stage):
+ """"Fuel nodes discovery, also can get openstack openrc"""
+
+ priority = StepOrder.DISCOVER
+ config_block = 'fuel'
+
+ @classmethod
+ def validate(cls, cfg: ConfigBlock) -> None:
+ # msg = "openstack_env should be provided in fuel config"
+ # check_input_param('openstack_env' in fuel_data, msg)
+ # fuel.openstack_env
+ pass
+
+ def run(self, ctx: TestRun) -> None:
+ if 'fuel' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'fuel/nodes'))
+ ctx.fuel_openstack_creds = ctx.storage['fuel/os_creds'] # type: ignore
+ ctx.fuel_version = ctx.storage['fuel/version'] # type: ignore
+ else:
+ fuel = ctx.config.fuel
+ discover_nodes = (fuel.discover != "fuel_openrc_only")
+ fuel_node_info = NodeInfo(parse_ssh_uri(fuel.ssh_creds), {'fuel_master'})
+ fuel_nodes = [fuel_node_info]
+
+ creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
+ fuel_conn = KeystoneAuth(fuel.url, creds)
+
+ # get cluster information from REST API
+ cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
+ cluster = reflect_cluster(fuel_conn, cluster_id)
+ ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+ logger.info("Found fuel {0}".format(".".join(map(str, ctx.fuel_version))))
+ openrc = cluster.get_openrc()
+
+ if openrc:
+ auth_url = cast(str, openrc['os_auth_url'])
+ if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+ logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+ auth_url = auth_url.replace("https", "http", 1)
+
+ os_creds = OSCreds(name=cast(str, openrc['username']),
+ passwd=cast(str, openrc['password']),
+ tenant=cast(str, openrc['tenant_name']),
+ auth_url=cast(str, auth_url),
+ insecure=cast(bool, openrc['insecure']))
+
+ ctx.fuel_openstack_creds = os_creds
+ else:
+ ctx.fuel_openstack_creds = None
+
+ if discover_nodes:
+
+ try:
+ fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
+ except AuthenticationException:
+ raise StopTestError("Wrong fuel credentials")
+ except Exception:
+ logger.exception("While connection to FUEL")
+ raise StopTestError("Failed to connect to FUEL")
+
+ logger.debug("Downloading FUEL node ssh master key")
+ fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
+ network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
+
+ for fuel_node in list(cluster.get_nodes()):
+ ip = str(fuel_node.get_ip(network))
+ fuel_nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key),
+ roles=set(fuel_node.get_roles())))
+
+ ctx.storage['fuel_nodes'] = fuel_nodes
+ ctx.nodes_info.extend(fuel_nodes)
+ ctx.nodes_info.append(fuel_node_info)
+ logger.debug("Found {} FUEL nodes for env {}".format(len(fuel_nodes) - 1, fuel.openstack_env))
+ else:
+ logger.debug("Skip FUEL nodes discovery, as 'fuel_openrc_only' is set to fuel.discover option")
+
+ ctx.storage["fuel/nodes"] = fuel_nodes
+ ctx.storage["fuel/os_creds"] = ctx.fuel_openstack_creds
+ ctx.storage["fuel/version"] = ctx.fuel_version
diff --git a/wally/main.py b/wally/main.py
index 3e1fcb3..14da140 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,7 +5,8 @@
import logging
import argparse
import functools
-from typing import List, Tuple, Any, Callable, IO, cast, Optional
+import contextlib
+from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
from yaml import load as _yaml_load
@@ -32,13 +33,33 @@
from .storage import make_storage, Storage
from .config import Config
from .logger import setup_loggers
-from .stage import log_stage, StageType
+from .stage import Stage
from .test_run_class import TestRun
+# stages
+from .ceph import DiscoverCephStage
+from .openstack import DiscoverOSStage
+from .fuel import DiscoverFuelStage
+from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage
+from .report import ConsoleReportStage, HtmlReportStage
+from .sensors import StartSensorsStage, CollectSensorsStage
+
+
logger = logging.getLogger("wally")
+@contextlib.contextmanager
+def log_stage(stage: Stage) -> Iterator[None]:
+ logger.info("Start " + stage.name())
+ try:
+ yield
+ except utils.StopTestError as exc:
+ logger.error("Exception during %s: %r", stage.name(), exc)
+ except Exception:
+ logger.exception("During %s", stage.name())
+
+
def list_results(path: str) -> List[Tuple[str, str, str, str]]:
results = [] # type: List[Tuple[float, str, str, str, str]]
@@ -97,6 +118,7 @@
test_parser.add_argument('--build-description', type=str, default="Build info")
test_parser.add_argument('--build-id', type=str, default="id")
test_parser.add_argument('--build-type', type=str, default="GA")
+ test_parser.add_argument('--dont-collect', action='store_true', help="Don't collect cluster info")
test_parser.add_argument('-n', '--no-tests', action='store_true', help="Don't run tests")
test_parser.add_argument('--load-report', action='store_true')
test_parser.add_argument("-k", '--keep-vm', action='store_true', help="Don't remove test vm's")
@@ -131,8 +153,7 @@
opts = parse_args(argv)
- stages = [] # type: List[StageType]
- report_stages = [] # type: List[StageType]
+ stages = [] # type: List[Stage]
# stop mypy from telling that config & storage might be undeclared
config = None # type: Config
@@ -141,7 +162,7 @@
if opts.subparser_name == 'test':
if opts.resume:
storage = make_storage(opts.resume, existing=True)
- config = storage.load('config', Config)
+ config = storage.load(Config, 'config')
else:
file_name = os.path.abspath(opts.config_file)
with open(file_name) as fd:
@@ -161,20 +182,18 @@
storage['config'] = config # type: ignore
- stages.extend([
- run_test.clouds_connect_stage,
- run_test.discover_stage,
- run_test.reuse_vms_stage,
- log_nodes_statistic_stage,
- run_test.save_nodes_stage,
- run_test.connect_stage])
- if config.get("collect_info", True):
- stages.append(run_test.collect_info_stage)
+ stages.append(DiscoverCephStage) # type: ignore
+ stages.append(DiscoverOSStage) # type: ignore
+ stages.append(DiscoverFuelStage) # type: ignore
+ stages.append(ExplicitNodesStage) # type: ignore
+ stages.append(SaveNodesStage) # type: ignore
+ stages.append(StartSensorsStage) # type: ignore
+ stages.append(RunTestsStage) # type: ignore
+ stages.append(CollectSensorsStage) # type: ignore
- stages.extend([
- run_test.run_tests_stage,
- ])
+ if not opts.dont_collect:
+ stages.append(CollectInfoStage) # type: ignore
elif opts.subparser_name == 'ls':
tab = texttable.Texttable(max_width=200)
@@ -196,9 +215,10 @@
# [x['io'][0], y['io'][0]]))
return 0
+ report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
- report_stages.append(run_test.console_report_stage)
- report_stages.append(run_test.html_report_stage)
+ report_stages.append(ConsoleReportStage) # type: ignore
+ report_stages.append(HtmlReportStage) # type: ignore
# log level is not a part of config
if opts.log_level is not None:
@@ -206,39 +226,44 @@
else:
str_level = config.get('logging/log_level', 'INFO')
- setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log'))
+ setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log', "w"))
logger.info("All info would be stored into %r", config.storage_url)
ctx = TestRun(config, storage)
+ stages.sort(key=lambda x: x.priority)
+
+ # TODO: run only stages, which have configs
+ failed = False
+ cleanup_stages = []
for stage in stages:
- ok = False
- with log_stage(stage):
- stage(ctx)
- ok = True
- if not ok:
+ try:
+ cleanup_stages.append(stage)
+ with log_stage(stage):
+ stage.run(ctx)
+ except:
+ failed = True
break
- exc, cls, tb = sys.exc_info()
- for stage in ctx.clear_calls_stack[::-1]:
- with log_stage(stage):
- stage(ctx)
+ logger.debug("Start cleanup")
+ cleanup_failed = False
+ for stage in cleanup_stages[::-1]:
+ try:
+ with log_stage(stage):
+ stage.cleanup(ctx)
+ except:
+ cleanup_failed = True
- logger.debug("Start utils.cleanup")
- for clean_func, args, kwargs in utils.iter_clean_func():
- with log_stage(clean_func):
- clean_func(*args, **kwargs)
-
- if exc is None:
+ if not failed:
for report_stage in report_stages:
with log_stage(report_stage):
- report_stage(ctx)
+ report_stage.run(ctx)
logger.info("All info is stored into %r", config.storage_url)
- if exc is None:
- logger.info("Tests finished successfully")
- return 0
- else:
+ if failed or cleanup_failed:
logger.error("Tests are failed. See error details in log above")
return 1
+ else:
+ logger.info("Tests finished successfully")
+ return 0
diff --git a/wally/node.py b/wally/node.py
index 2b58571..fae7879 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -133,22 +133,22 @@
raise NotImplementedError()
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def copy_file(self, local_path: str, remote_path: str = None) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def put_to_file(self, path: str, content: bytes) -> None:
- raise NotImplemented()
+ raise NotImplementedError()
def get_interface(self, ip: str) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def stat_file(self, path: str) -> Any:
- raise NotImplemented()
+ raise NotImplementedError()
def disconnect(self) -> str:
- raise NotImplemented()
+ raise NotImplementedError()
def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
diff --git a/wally/openstack.py b/wally/openstack.py
new file mode 100644
index 0000000..cff6150
--- /dev/null
+++ b/wally/openstack.py
@@ -0,0 +1,256 @@
+import os.path
+import socket
+import logging
+from typing import Dict, Any, List, Tuple, cast, Optional
+
+from .node_interfaces import NodeInfo
+from .config import ConfigBlock, Config
+from .ssh_utils import ConnCreds
+from .openstack_api import (os_connect, find_vms,
+ OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
+from .test_run_class import TestRun
+from .stage import Stage, StepOrder
+from .utils import LogError, StopTestError, get_creds_openrc
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def get_floating_ip(vm: Any) -> str:
+ """Get VM floating IP address"""
+
+ for net_name, ifaces in vm.addresses.items():
+ for iface in ifaces:
+ if iface.get('OS-EXT-IPS:type') == "floating":
+ return iface['addr']
+
+ raise ValueError("VM {} has no floating ip".format(vm))
+
+
+def ensure_connected_to_openstack(ctx: TestRun) -> None:
+ if not ctx.os_connection is None:
+ if ctx.os_creds is None:
+ ctx.os_creds = get_OS_credentials(ctx)
+ ctx.os_connection = os_connect(ctx.os_creds)
+
+
+def get_OS_credentials(ctx: TestRun) -> OSCreds:
+ if "openstack_openrc" in ctx.storage:
+ return ctx.storage.load(OSCreds, "openstack_openrc")
+
+ creds = None
+ os_creds = None
+ force_insecure = False
+ cfg = ctx.config
+
+ if 'openstack' in cfg.clouds:
+ os_cfg = cfg.clouds['openstack']
+ if 'OPENRC' in os_cfg:
+ logger.info("Using OS credentials from " + os_cfg['OPENRC'])
+ creds_tuple = get_creds_openrc(os_cfg['OPENRC'])
+ os_creds = OSCreds(*creds_tuple)
+ elif 'ENV' in os_cfg:
+ logger.info("Using OS credentials from shell environment")
+ os_creds = get_openstack_credentials()
+ elif 'OS_TENANT_NAME' in os_cfg:
+ logger.info("Using predefined credentials")
+ os_creds = OSCreds(os_cfg['OS_USERNAME'].strip(),
+ os_cfg['OS_PASSWORD'].strip(),
+ os_cfg['OS_TENANT_NAME'].strip(),
+ os_cfg['OS_AUTH_URL'].strip(),
+ os_cfg.get('OS_INSECURE', False))
+
+ elif 'OS_INSECURE' in os_cfg:
+ force_insecure = os_cfg.get('OS_INSECURE', False)
+
+ if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
+ ctx.fuel_openstack_creds is not None:
+ logger.info("Using fuel creds")
+ creds = ctx.fuel_openstack_creds
+ elif os_creds is None:
+ logger.error("Can't found OS credentials")
+ raise StopTestError("Can't found OS credentials", None)
+
+ if creds is None:
+ creds = os_creds
+
+ if force_insecure and not creds.insecure:
+ creds = OSCreds(creds.name, creds.passwd, creds.tenant, creds.auth_url, True)
+
+ logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
+ "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
+
+ ctx.storage["openstack_openrc"] = creds # type: ignore
+ return creds
+
+
+def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
+ key_name = cfg.vm_configs['keypair_name']
+ private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
+ public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
+ return (private_path, public_path)
+
+
+class DiscoverOSStage(Stage):
+ """Discover openstack nodes and VMS"""
+
+ config_block = 'openstack'
+
+ # discover FUEL cluster first
+ priority = StepOrder.DISCOVER + 1
+
+ @classmethod
+ def validate(cls, conf: ConfigBlock) -> None:
+ pass
+
+ def run(self, ctx: TestRun) -> None:
+ cfg = ctx.config.openstack
+ os_nodes_auth = cfg.auth # type: str
+
+ if os_nodes_auth.count(":") == 2:
+ user, password, key_file = os_nodes_auth.split(":") # type: str, Optional[str], Optional[str]
+ if not password:
+ password = None
+ else:
+ user, password = os_nodes_auth.split(":")
+ key_file = None
+
+ ensure_connected_to_openstack(ctx)
+
+ if 'openstack_nodes' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "openstack_nodes"))
+ else:
+ openstack_nodes = [] # type: List[NodeInfo]
+ services = ctx.os_connection.nova.services.list() # type: List[Any]
+ host_services_mapping = {} # type: Dict[str, List[str]]
+
+ for service in services:
+ ip = cast(str, socket.gethostbyname(service.host))
+ host_services_mapping.get(ip, []).append(service.binary)
+
+ logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
+
+ for host, services in host_services_mapping.items():
+ creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+ openstack_nodes.append(NodeInfo(creds, set(services)))
+
+ ctx.nodes_info.extend(openstack_nodes)
+ ctx.storage['openstack_nodes'] = openstack_nodes # type: ignore
+
+ if "reused_os_nodes" in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "reused_nodes"))
+ else:
+ reused_nodes = [] # type: List[NodeInfo]
+ private_key_path = get_vm_keypair_path(ctx.config)[0]
+
+ vm_creds = None # type: str
+ for vm_creds in cfg.get("vms", []):
+ user_name, vm_name_pattern = vm_creds.split("@", 1)
+ msg = "Vm like {} lookup failed".format(vm_name_pattern)
+
+ with LogError(msg):
+ msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+ logger.debug(msg)
+
+ ensure_connected_to_openstack(ctx)
+
+ for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+ creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+ node_info = NodeInfo(creds, {'testnode'})
+ node_info.os_vm_id = vm_id
+ reused_nodes.append(node_info)
+
+ ctx.nodes_info.extend(reused_nodes)
+ ctx.storage["reused_os_nodes"] = reused_nodes # type: ignore
+
+
+class CreateOSVMSStage(Stage):
+ "Spawn new VM's in Openstack cluster"
+
+ priority = StepOrder.SPAWN # type: int
+ config_block = 'spawn_os_vms' # type: str
+
+ def run(self, ctx: TestRun) -> None:
+ vm_spawn_config = ctx.config.spawn_os_vms
+ vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
+
+ if 'spawned_os_nodes' in ctx.storage:
+ ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "spawned_os_nodes"))
+ else:
+ ensure_connected_to_openstack(ctx)
+ params = vm_image_config.copy()
+ params.update(vm_spawn_config)
+ params.update(get_vm_keypair_path(ctx.config))
+ params['group_name'] = ctx.config.run_uuid
+ params['keypair_name'] = ctx.config.vm_configs['keypair_name']
+
+ if not ctx.config.openstack.get("skip_preparation", False):
+ logger.info("Preparing openstack")
+ prepare_os(ctx.os_connection, params)
+
+ new_nodes = []
+ ctx.os_spawned_nodes_ids = []
+ with ctx.get_pool() as pool:
+ for node_info in launch_vms(ctx.os_connection, params, pool):
+ node_info.roles.add('testnode')
+ ctx.os_spawned_nodes_ids.append(node_info.os_vm_id)
+ new_nodes.append(node_info)
+
+ ctx.storage['spawned_os_nodes'] = new_nodes # type: ignore
+
+ def cleanup(self, ctx: TestRun) -> None:
+ # keep nodes in case of error for future test restart
+ if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
+ logger.info("Removing nodes")
+
+ clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
+ del ctx.storage['spawned_os_nodes']
+
+ logger.info("Nodes has been removed")
+
+
+
+# @contextlib.contextmanager
+# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
+#
+# pausable_nodes_ids = [cast(int, node.info.os_vm_id)
+# for node in unused_nodes
+# if node.info.os_vm_id is not None]
+#
+# non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
+#
+# if non_pausable:
+# logger.warning("Can't pause {} nodes".format(non_pausable))
+#
+# if pausable_nodes_ids:
+# logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+# with ctx.get_pool() as pool:
+# openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
+#
+# try:
+# yield pausable_nodes_ids
+# finally:
+# if pausable_nodes_ids:
+# logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
+# with ctx.get_pool() as pool:
+# openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+# def clouds_connect_stage(ctx: TestRun) -> None:
+ # TODO(koder): need to use this to connect to openstack in upper code
+ # conn = ctx.config['clouds/openstack']
+ # user, passwd, tenant = parse_creds(conn['creds'])
+ # auth_data = dict(auth_url=conn['auth_url'],
+ # username=user,
+ # api_key=passwd,
+ # project_id=tenant) # type: Dict[str, str]
+ # logger.debug("Discovering openstack nodes with connection details: %r", conn)
+ # connect to openstack, fuel
+
+ # # parse FUEL REST credentials
+ # username, tenant_name, password = parse_creds(fuel_data['creds'])
+ # creds = {"username": username,
+ # "tenant_name": tenant_name,
+ # "password": password}
+ #
+ # # connect to FUEL
+ # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
+ # pass
\ No newline at end of file
diff --git a/wally/start_vms.py b/wally/openstack_api.py
similarity index 97%
rename from wally/start_vms.py
rename to wally/openstack_api.py
index a55fdbf..2e9ab63 100644
--- a/wally/start_vms.py
+++ b/wally/openstack_api.py
@@ -7,7 +7,7 @@
import tempfile
import subprocess
import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
from concurrent.futures import ThreadPoolExecutor
from keystoneauth1 import loading, session
@@ -19,6 +19,7 @@
from .utils import Timeout
from .node_interfaces import NodeInfo
from .storage import IStorable
+from .ssh_utils import ConnCreds
__doc__ = """
@@ -81,7 +82,6 @@
def find_vms(conn: OSConnection, name_prefix: str) -> Iterable[Tuple[str, int]]:
for srv in conn.nova.servers.list():
if srv.name.startswith(name_prefix):
-
# need to exit after found server first external IP
# so have to rollout two cycles to avoid using exceptions
all_ip = [] # type: List[Any]
@@ -451,11 +451,10 @@
# precache all errors before start creating vms
private_key_path = params['keypair_file_private']
- creds = params['image']['creds']
+ user = params['image']['user']
for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
- conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
- info = NodeInfo(conn_uri, set())
+ info = NodeInfo(ConnCreds(ip, user, key_file=private_key_path), set())
info.os_vm_id = os_node.id
yield info
@@ -574,19 +573,18 @@
scheduler_hints=scheduler_hints, security_groups=security_groups)
if not wait_for_server_active(conn, srv):
- msg = "Server {0} fails to start. Kill it and try again"
- logger.debug(msg.format(srv))
+ logger.debug("Server {} fails to start. Kill it and try again".format(srv))
conn.nova.servers.delete(srv)
try:
- for _ in Timeout(delete_timeout, "Server {0} delete timeout".format(srv.id)):
+ for _ in Timeout(delete_timeout, "Server {} delete timeout".format(srv.id)):
srv = conn.nova.servers.get(srv.id)
except NotFound:
pass
else:
break
else:
- raise RuntimeError("Failed to start server".format(srv.id))
+ raise RuntimeError("Failed to start server {}".format(srv.id))
if vol_sz is not None:
vol = create_volume(conn, vol_sz, name)
@@ -598,6 +596,7 @@
if flt_ip is not None:
srv.add_floating_ip(flt_ip)
+ # pylint: disable=E1101
return flt_ip.ip, conn.nova.servers.get(srv.id)
diff --git a/wally/ops_log.py b/wally/ops_log.py
new file mode 100644
index 0000000..48f2b61
--- /dev/null
+++ b/wally/ops_log.py
@@ -0,0 +1,9 @@
+from typing import Any
+
+
+log = []
+
+
+def log_op(name: str, *params: Any) -> None:
+ log.append([name] + list(params))
+
diff --git a/wally/report.py b/wally/report.py
index 88c97b7..ecf3ba7 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -20,6 +20,8 @@
from .utils import ssize2b
from .statistic import round_3_digit
from .storage import Storage
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
from .result_classes import TestInfo, FullTestResult, SensorInfo
from .suits.io.fio_task_parser import (get_test_sync_mode,
get_test_summary,
@@ -33,29 +35,46 @@
def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
sensors_data = {} # type: Dict[Tuple[str, str, str], SensorInfo]
- for _, node_id in storage.list("metric"):
- for _, dev_name in storage.list("metric", node_id):
- for _, sensor_name in storage.list("metric", node_id, dev_name):
+ mstorage = storage.sub_storage("metric")
+ for _, node_id in mstorage.list():
+ for _, dev_name in mstorage.list(node_id):
+ for _, sensor_name in mstorage.list(node_id, dev_name):
key = (node_id, dev_name, sensor_name)
si = SensorInfo(*key)
- si.begin_time, si.end_time, si.data = storage["metric/{}/{}/{}".format(*key)] # type: ignore
+ si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name] # type: ignore
sensors_data[key] = si
- for _, run_id in storage.list("result"):
- path = "result/" + run_id
+ rstorage = storage.sub_storage("result")
+ for _, run_id in rstorage.list():
ftr = FullTestResult()
- ftr.info = storage.load(TestInfo, path, "info")
+ ftr.test_info = rstorage.load(TestInfo, run_id, "info")
ftr.performance_data = {}
- p1 = "result/{}/measurement".format(run_id)
- for _, node_id in storage.list(p1):
- for _, measurement_name in storage.list(p1, node_id):
+ p1 = "{}/measurement".format(run_id)
+ for _, node_id in rstorage.list(p1):
+ for _, measurement_name in rstorage.list(p1, node_id):
perf_key = (node_id, measurement_name)
- ftr.performance_data[perf_key] = storage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
+ ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)] # type: ignore
yield ftr
+class ConsoleReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
+class HtmlReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
# class StoragePerfInfo:
# def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
# self.direct_iops_r_max = 0 # type: int
diff --git a/wally/run_test.py b/wally/run_test.py
index 9ae2c9e..1a645b6 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,21 +1,18 @@
-import os
import logging
-import contextlib
-from typing import List, Dict, Iterable, Iterator, Tuple, Optional, Union, cast
-from concurrent.futures import ThreadPoolExecutor, Future
+from concurrent.futures import Future
+from typing import List, Dict, Tuple, Optional, Union, cast
-from .node_interfaces import NodeInfo, IRPCNode
-from .test_run_class import TestRun
-from .discover import discover
-from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
+from . import utils, ssh_utils, hw_info
+from .config import ConfigBlock
from .node import setup_rpc, connect
-from .config import ConfigBlock, Config
-
-from .suits.mysql import MysqlTest
-from .suits.itest import TestInputConfig
+from .node_interfaces import NodeInfo, IRPCNode
+from .stage import Stage, StepOrder
from .suits.io.fio import IOPerfTest
-from .suits.postgres import PgBenchTest
+from .suits.itest import TestInputConfig
+from .suits.mysql import MysqlTest
from .suits.omgbench import OmgTest
+from .suits.postgres import PgBenchTest
+from .test_run_class import TestRun
TOOL_TYPE_MAPPER = {
@@ -29,431 +26,149 @@
logger = logging.getLogger("wally")
-def connect_all(nodes_info: List[NodeInfo], pool: ThreadPoolExecutor, conn_timeout: int = 30) -> List[IRPCNode]:
- """Connect to all nodes, log errors"""
+class ConnectStage(Stage):
+ """Connect to nodes stage"""
- logger.info("Connecting to %s nodes", len(nodes_info))
+ priority = StepOrder.CONNECT
- def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
- try:
- ssh_node = connect(node_info, conn_timeout=conn_timeout)
- # TODO(koder): need to pass all required rpc bytes to this call
- return True, setup_rpc(ssh_node, b"")
- except Exception as exc:
- logger.error("During connect to {}: {!s}".format(node, exc))
- return False, node_info
-
- failed_testnodes = [] # type: List[NodeInfo]
- failed_nodes = [] # type: List[NodeInfo]
- ready = [] # type: List[IRPCNode]
-
- for ok, node in pool.map(connect_ext, nodes_info):
- if not ok:
- node = cast(NodeInfo, node)
- if 'testnode' in node.roles:
- failed_testnodes.append(node)
- else:
- failed_nodes.append(node)
- else:
- ready.append(cast(IRPCNode, node))
-
- if failed_nodes:
- msg = "Node(s) {} would be excluded - can't connect"
- logger.warning(msg.format(",".join(map(str, failed_nodes))))
-
- if failed_testnodes:
- msg = "Can't connect to testnode(s) " + \
- ",".join(map(str, failed_testnodes))
- logger.error(msg)
- raise utils.StopTestError(msg)
-
- if not failed_nodes:
- logger.info("All nodes connected successfully")
-
- return ready
-
-
-def collect_info_stage(ctx: TestRun) -> None:
- futures = {} # type: Dict[str, Future]
-
- with ctx.get_pool() as pool:
- for node in ctx.nodes:
- hw_info_path = "hw_info/{}".format(node.info.node_id())
- if hw_info_path not in ctx.storage:
- futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
-
- sw_info_path = "sw_info/{}".format(node.info.node_id())
- if sw_info_path not in ctx.storage:
- futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
-
- for path, future in futures.items():
- ctx.storage[path] = future.result()
-
-
-@contextlib.contextmanager
-def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
-
- pausable_nodes_ids = [cast(int, node.info.os_vm_id)
- for node in unused_nodes
- if node.info.os_vm_id is not None]
-
- non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-
- if non_pausable:
- logger.warning("Can't pause {} nodes".format(non_pausable))
-
- if pausable_nodes_ids:
- logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+ def run(self, ctx: TestRun) -> None:
with ctx.get_pool() as pool:
- start_vms.pause(ctx.os_connection, pausable_nodes_ids, pool)
+ logger.info("Connecting to %s nodes", len(ctx.nodes_info))
- try:
- yield pausable_nodes_ids
- finally:
- if pausable_nodes_ids:
- logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
- with ctx.get_pool() as pool:
- start_vms.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+ def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
+ try:
+ ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
+ # TODO(koder): need to pass all required rpc bytes to this call
+ return True, setup_rpc(ssh_node, b"")
+ except Exception as exc:
+ logger.error("During connect to {}: {!s}".format(node, exc))
+ return False, node_info
+ failed_testnodes = [] # type: List[NodeInfo]
+ failed_nodes = [] # type: List[NodeInfo]
+ ctx.nodes = []
-def run_tests(ctx: TestRun, test_block: ConfigBlock, nodes: List[IRPCNode]) -> None:
- """Run test from test block"""
+ for ok, node in pool.map(connect_ext, ctx.nodes_info):
+ if not ok:
+ node = cast(NodeInfo, node)
+ if 'testnode' in node.roles:
+ failed_testnodes.append(node)
+ else:
+ failed_nodes.append(node)
+ else:
+ ctx.nodes.append(cast(IRPCNode, node))
- test_nodes = [node for node in nodes if 'testnode' in node.info.roles]
+ if failed_nodes:
+ msg = "Node(s) {} would be excluded - can't connect"
+ logger.warning(msg.format(",".join(map(str, failed_nodes))))
- if not test_nodes:
- logger.error("No test nodes found")
- return
-
- for name, params in test_block.items():
- vm_count = params.get('node_limit', None) # type: Optional[int]
-
- # select test nodes
- if vm_count is None:
- curr_test_nodes = test_nodes
- unused_nodes = [] # type: List[IRPCNode]
- else:
- curr_test_nodes = test_nodes[:vm_count]
- unused_nodes = test_nodes[vm_count:]
-
- if not curr_test_nodes:
- logger.error("No nodes found for test, skipping it.")
- continue
-
- # results_path = generate_result_dir_name(cfg.results_storage, name, params)
- # utils.mkdirs_if_unxists(results_path)
-
- # suspend all unused virtual nodes
- if ctx.config.get('suspend_unused_vms', True):
- suspend_ctx = suspend_vm_nodes_ctx(ctx, unused_nodes)
- else:
- suspend_ctx = utils.empty_ctx()
-
- resumable_nodes_ids = [cast(int, node.info.os_vm_id)
- for node in curr_test_nodes
- if node.info.os_vm_id is not None]
-
- if resumable_nodes_ids:
- logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
-
- with ctx.get_pool() as pool:
- start_vms.unpause(ctx.os_connection, resumable_nodes_ids, pool)
-
- with suspend_ctx:
- test_cls = TOOL_TYPE_MAPPER[name]
- remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
- test_cfg = TestInputConfig(test_cls.__name__,
- params=params,
- run_uuid=ctx.config.run_uuid,
- nodes=test_nodes,
- storage=ctx.storage,
- remote_dir=remote_dir)
-
- test_cls(test_cfg).run()
-
-
-def connect_stage(ctx: TestRun) -> None:
- ctx.clear_calls_stack.append(disconnect_stage)
-
- with ctx.get_pool() as pool:
- ctx.nodes = connect_all(ctx.nodes_info, pool)
-
-
-def discover_stage(ctx: TestRun) -> None:
- """discover clusters and nodes stage"""
-
- # TODO(koder): Properly store discovery info and check if it available to skip phase
-
- discover_info = ctx.config.get('discover')
- if discover_info:
- if "discovered_nodes" in ctx.storage:
- nodes = ctx.storage.load_list("discovered_nodes", NodeInfo)
- ctx.fuel_openstack_creds = ctx.storage.load("fuel_openstack_creds", start_vms.OSCreds)
- else:
- discover_objs = [i.strip() for i in discover_info.strip().split(",")]
-
- ctx.fuel_openstack_creds, nodes = discover.discover(ctx,
- discover_objs,
- ctx.config.clouds,
- not ctx.config.dont_discover_nodes)
-
- ctx.storage["fuel_openstack_creds"] = ctx.fuel_openstack_creds # type: ignore
- ctx.storage["discovered_nodes"] = nodes # type: ignore
- ctx.nodes_info.extend(nodes)
-
- for url, roles in ctx.config.get('explicit_nodes', {}).items():
- creds = ssh_utils.parse_ssh_uri(url)
- roles = set(roles.split(","))
- ctx.nodes_info.append(NodeInfo(creds, roles))
-
-
-def save_nodes_stage(ctx: TestRun) -> None:
- """Save nodes list to file"""
- ctx.storage['nodes'] = ctx.nodes_info # type: ignore
-
-
-def ensure_connected_to_openstack(ctx: TestRun) -> None:
- if not ctx.os_connection is None:
- if ctx.os_creds is None:
- ctx.os_creds = get_OS_credentials(ctx)
- ctx.os_connection = start_vms.os_connect(ctx.os_creds)
-
-
-def reuse_vms_stage(ctx: TestRun) -> None:
- if "reused_nodes" in ctx.storage:
- ctx.nodes_info.extend(ctx.storage.load_list("reused_nodes", NodeInfo))
- else:
- reused_nodes = []
- vms_patterns = ctx.config.get('clouds/openstack/vms', [])
- private_key_path = get_vm_keypair_path(ctx.config)[0]
-
- for creds in vms_patterns:
- user_name, vm_name_pattern = creds.split("@", 1)
- msg = "Vm like {} lookup failed".format(vm_name_pattern)
-
- with utils.LogError(msg):
- msg = "Looking for vm with name like {0}".format(vm_name_pattern)
- logger.debug(msg)
-
- ensure_connected_to_openstack(ctx)
-
- for ip, vm_id in start_vms.find_vms(ctx.os_connection, vm_name_pattern):
- creds = ssh_utils.ConnCreds(host=ip, user=user_name, key_file=private_key_path)
- node_info = NodeInfo(creds, {'testnode'})
- node_info.os_vm_id = vm_id
- reused_nodes.append(node_info)
- ctx.nodes_info.append(node_info)
-
- ctx.storage["reused_nodes"] = reused_nodes # type: ignore
-
-
-def get_OS_credentials(ctx: TestRun) -> start_vms.OSCreds:
-
- if "openstack_openrc" in ctx.storage:
- return ctx.storage.load("openstack_openrc", start_vms.OSCreds)
-
- creds = None
- os_creds = None
- force_insecure = False
- cfg = ctx.config
-
- if 'openstack' in cfg.clouds:
- os_cfg = cfg.clouds['openstack']
- if 'OPENRC' in os_cfg:
- logger.info("Using OS credentials from " + os_cfg['OPENRC'])
- creds_tuple = utils.get_creds_openrc(os_cfg['OPENRC'])
- os_creds = start_vms.OSCreds(*creds_tuple)
- elif 'ENV' in os_cfg:
- logger.info("Using OS credentials from shell environment")
- os_creds = start_vms.get_openstack_credentials()
- elif 'OS_TENANT_NAME' in os_cfg:
- logger.info("Using predefined credentials")
- os_creds = start_vms.OSCreds(os_cfg['OS_USERNAME'].strip(),
- os_cfg['OS_PASSWORD'].strip(),
- os_cfg['OS_TENANT_NAME'].strip(),
- os_cfg['OS_AUTH_URL'].strip(),
- os_cfg.get('OS_INSECURE', False))
-
- elif 'OS_INSECURE' in os_cfg:
- force_insecure = os_cfg.get('OS_INSECURE', False)
-
- if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
- ctx.fuel_openstack_creds is not None:
- logger.info("Using fuel creds")
- creds = ctx.fuel_openstack_creds
- elif os_creds is None:
- logger.error("Can't found OS credentials")
- raise utils.StopTestError("Can't found OS credentials", None)
-
- if creds is None:
- creds = os_creds
-
- if force_insecure and not creds.insecure:
- creds = start_vms.OSCreds(creds.name,
- creds.passwd,
- creds.tenant,
- creds.auth_url,
- True)
-
- logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
- "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
-
- ctx.storage["openstack_openrc"] = creds # type: ignore
- return creds
-
-
-def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
- key_name = cfg.vm_configs['keypair_name']
- private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
- public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
- return (private_path, public_path)
-
-
-@contextlib.contextmanager
-def create_vms_ctx(ctx: TestRun, vm_config: ConfigBlock, already_has_count: int = 0) -> Iterator[List[NodeInfo]]:
-
- if 'spawned_vm_ids' in ctx.storage:
- os_nodes_ids = ctx.storage.get('spawned_vm_ids', []) # type: List[int]
- new_nodes = [] # type: List[NodeInfo]
-
- # TODO(koder): reconnect to old VM's
- raise NotImplementedError("Reconnect to old vms is not implemented")
- else:
- os_nodes_ids = []
- new_nodes = []
- no_spawn = False
- if vm_config['count'].startswith('='):
- count = int(vm_config['count'][1:])
- if count <= already_has_count:
- logger.debug("Not need new vms")
- no_spawn = True
-
- if not no_spawn:
- ensure_connected_to_openstack(ctx)
- params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
- params.update(vm_config)
- params.update(get_vm_keypair_path(ctx.config))
- params['group_name'] = ctx.config.run_uuid
- params['keypair_name'] = ctx.config.vm_configs['keypair_name']
-
- if not vm_config.get('skip_preparation', False):
- logger.info("Preparing openstack")
- start_vms.prepare_os(ctx.os_connection, params)
-
- with ctx.get_pool() as pool:
- for node_info in start_vms.launch_vms(ctx.os_connection, params, pool, already_has_count):
- node_info.roles.add('testnode')
- os_nodes_ids.append(node_info.os_vm_id)
- new_nodes.append(node_info)
-
- ctx.storage['spawned_vm_ids'] = os_nodes_ids # type: ignore
- yield new_nodes
-
- # keep nodes in case of error for future test restart
- if not ctx.config.keep_vm:
- shut_down_vms_stage(ctx, os_nodes_ids)
-
- del ctx.storage['spawned_vm_ids']
-
-
-@contextlib.contextmanager
-def sensor_monitoring(ctx: TestRun, cfg: ConfigBlock, nodes: List[IRPCNode]) -> Iterator[None]:
- yield
-
-
-def run_tests_stage(ctx: TestRun) -> None:
- for group in ctx.config.get('tests', []):
- gitems = list(group.items())
- if len(gitems) != 1:
- msg = "Items in tests section should have len == 1"
- logger.error(msg)
- raise utils.StopTestError(msg)
-
- key, config = gitems[0]
-
- if 'start_test_nodes' == key:
- if 'openstack' not in config:
- msg = "No openstack block in config - can't spawn vm's"
+ if failed_testnodes:
+ msg = "Can't connect to testnode(s) " + \
+ ",".join(map(str, failed_testnodes))
logger.error(msg)
raise utils.StopTestError(msg)
- num_test_nodes = len([node for node in ctx.nodes if 'testnode' in node.info.roles])
- vm_ctx = create_vms_ctx(ctx, config['openstack'], num_test_nodes)
- tests = config.get('tests', [])
- else:
- vm_ctx = utils.empty_ctx([])
- tests = [group]
+ if not failed_nodes:
+ logger.info("All nodes connected successfully")
- # make mypy happy
- new_nodes = [] # type: List[NodeInfo]
+ def cleanup(self, ctx: TestRun) -> None:
+ # TODO(koder): what next line was for?
+ # ssh_utils.close_all_sessions()
- with vm_ctx as new_nodes:
- if new_nodes:
- with ctx.get_pool() as pool:
- new_rpc_nodes = connect_all(new_nodes, pool)
+ for node in ctx.nodes:
+ node.disconnect()
- test_nodes = ctx.nodes + new_rpc_nodes
- if ctx.config.get('sensors'):
- sensor_ctx = sensor_monitoring(ctx, ctx.config.get('sensors'), test_nodes)
- else:
- sensor_ctx = utils.empty_ctx([])
+class CollectInfoStage(Stage):
+ """Collect node info"""
+ priority = StepOrder.START_SENSORS - 1
+ config_block = 'collect_info'
+
+ def run(self, ctx: TestRun) -> None:
+ if not ctx.config.collect_info:
+ return
+
+ futures = {} # type: Dict[str, Future]
+
+ with ctx.get_pool() as pool:
+ for node in ctx.nodes:
+ hw_info_path = "hw_info/{}".format(node.info.node_id())
+ if hw_info_path not in ctx.storage:
+ futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
+
+ sw_info_path = "sw_info/{}".format(node.info.node_id())
+ if sw_info_path not in ctx.storage:
+ futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
+
+ for path, future in futures.items():
+ ctx.storage[path] = future.result()
+
+
+class ExplicitNodesStage(Stage):
+ """add explicit nodes"""
+
+ priority = StepOrder.DISCOVER
+ config_block = 'nodes'
+
+ def run(self, ctx: TestRun) -> None:
+ explicit_nodes = []
+ for url, roles in ctx.config.get('explicit_nodes', {}).items():
+ creds = ssh_utils.parse_ssh_uri(url)
+ roles = set(roles.split(","))
+ explicit_nodes.append(NodeInfo(creds, roles))
+
+ ctx.nodes_info.extend(explicit_nodes)
+ ctx.storage['explicit_nodes'] = explicit_nodes # type: ignore
+
+
+class SaveNodesStage(Stage):
+ """Save nodes list to file"""
+
+ priority = StepOrder.CONNECT
+
+ def run(self, ctx: TestRun) -> None:
+ ctx.storage['all_nodes'] = ctx.nodes_info # type: ignore
+
+
+class RunTestsStage(Stage):
+
+ priority = StepOrder.TEST
+ config_block = 'tests'
+
+ def run(self, ctx: TestRun) -> None:
+ for test_group in ctx.config.get('tests', []):
if not ctx.config.no_tests:
- for test_group in tests:
- with sensor_ctx:
- run_tests(ctx, test_group, test_nodes)
+ test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
- for node in new_rpc_nodes:
- node.disconnect()
+ if not test_nodes:
+ logger.error("No test nodes found")
+ return
+ for name, params in test_group.items():
+ vm_count = params.get('node_limit', None) # type: Optional[int]
-def clouds_connect_stage(ctx: TestRun) -> None:
- # TODO(koder): need to use this to connect to openstack in upper code
- # conn = ctx.config['clouds/openstack']
- # user, passwd, tenant = parse_creds(conn['creds'])
- # auth_data = dict(auth_url=conn['auth_url'],
- # username=user,
- # api_key=passwd,
- # project_id=tenant) # type: Dict[str, str]
- # logger.debug("Discovering openstack nodes with connection details: %r", conn)
- # connect to openstack, fuel
+ # select test nodes
+ if vm_count is None:
+ curr_test_nodes = test_nodes
+ else:
+ curr_test_nodes = test_nodes[:vm_count]
- # # parse FUEL REST credentials
- # username, tenant_name, password = parse_creds(fuel_data['creds'])
- # creds = {"username": username,
- # "tenant_name": tenant_name,
- # "password": password}
- #
- # # connect to FUEL
- # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
- pass
+ if not curr_test_nodes:
+ logger.error("No nodes found for test, skipping it.")
+ continue
+ test_cls = TOOL_TYPE_MAPPER[name]
+ remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
+ test_cfg = TestInputConfig(test_cls.__name__,
+ params=params,
+ run_uuid=ctx.config.run_uuid,
+ nodes=test_nodes,
+ storage=ctx.storage,
+ remote_dir=remote_dir)
-def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
- if nodes_ids:
- logger.info("Removing nodes")
- start_vms.clear_nodes(ctx.os_connection, nodes_ids)
- logger.info("Nodes has been removed")
+ test_cls(test_cfg).run()
-
-def clear_enviroment(ctx: TestRun) -> None:
- shut_down_vms_stage(ctx, ctx.storage.get('spawned_vm_ids', []))
- ctx.storage['spawned_vm_ids'] = [] # type: ignore
-
-
-def disconnect_stage(ctx: TestRun) -> None:
- # TODO(koder): what next line was for?
- # ssh_utils.close_all_sessions()
-
- for node in ctx.nodes:
- node.disconnect()
-
-
-def console_report_stage(ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
-
-def html_report_stage(ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+ @classmethod
+ def validate_config(cls, cfg: ConfigBlock) -> None:
+ pass
diff --git a/wally/sensors.py b/wally/sensors.py
index b579f3f..c86aeb4 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,62 +1,69 @@
-from typing import List, Dict, Tuple
+from typing import List, Dict, Tuple, Any
+
from .test_run_class import TestRun
from . import sensors_rpc_plugin
-
+from .stage import Stage, StepOrder
plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
SENSORS_PLUGIN_CODE = open(plugin_fname).read()
# TODO(koder): in case if node has more than one role sensor settigns might be incorrect
-def start_sensors_stage(ctx: TestRun) -> None:
- if 'sensors' not in ctx.config:
- return
- per_role_config = {}
- for name, val in ctx.config['sensors'].copy():
- if isinstance(val, str):
- val = {vl.strip(): ".*" for vl in val.split(",")}
- elif isinstance(val, list):
- val = {vl: ".*" for vl in val}
- per_role_config[name] = val
+class StartSensorsStage(Stage):
+ priority = StepOrder.START_SENSORS
+ config_block = 'sensors'
- if 'all' in per_role_config:
- all_vl = per_role_config.pop('all')
- all_roles = set(per_role_config)
+ def run(self, ctx: TestRun) -> None:
+ if 'sensors' not in ctx.config:
+ return
+
+ per_role_config = {} # type: Dict[str, Dict[str, str]]
+ for name, val in ctx.config['sensors'].copy():
+ if isinstance(val, str):
+ val = {vl.strip(): ".*" for vl in val.split(",")}
+ elif isinstance(val, list):
+ val = {vl: ".*" for vl in val}
+ per_role_config[name] = val
+
+ if 'all' in per_role_config:
+ all_vl = per_role_config.pop('all')
+ all_roles = set(per_role_config)
+
+ for node in ctx.nodes:
+ all_roles.update(node.info.roles)
+
+ for name, vals in list(per_role_config.items()):
+ new_vals = all_vl.copy()
+ new_vals.update(vals)
+ per_role_config[name] = new_vals
for node in ctx.nodes:
- all_roles.update(node.info.roles)
+ node_cfg = {} # type: Dict[str, str]
+ for role in node.info.roles:
+ node_cfg.update(per_role_config.get(role, {}))
- for name, vals in list(per_role_config.items()):
- new_vals = all_roles.copy()
- new_vals.update(vals)
- per_role_config[name] = new_vals
-
- for node in ctx.nodes:
- node_cfg = {}
- for role in node.info.roles:
- node_cfg.update(per_role_config.get(role, {}))
-
- if node_cfg:
- node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
- ctx.sensors_run_on.add(node.info.node_id())
- node.conn.sensors.start()
+ if node_cfg:
+ node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
+ ctx.sensors_run_on.add(node.info.node_id())
+ node.conn.sensors.start()
-def collect_sensors_stage(ctx: TestRun, stop: bool = True) -> None:
- for node in ctx.nodes:
- node_id = node.info.node_id()
- if node_id in ctx.sensors_run_on:
+class CollectSensorsStage(Stage):
+ priority = StepOrder.COLLECT_SENSORS
+ config_block = 'sensors'
- if stop:
+ def run(self, ctx: TestRun) -> None:
+ for node in ctx.nodes:
+ node_id = node.info.node_id()
+ if node_id in ctx.sensors_run_on:
+
data, collected_at = node.conn.sensors.stop() # type: Dict[Tuple[str, str], List[int]], List[float]
- else:
- data, collected_at = node.conn.sensors.get_updates()
- for (source_name, sensor_name), values in data.items():
- path = "metric/{}/{}/{}".format(node_id, source_name, sensor_name)
- ctx.storage.append(path, values)
- ctx.storage.append("metric/{}/collected_at".format(node_id), collected_at)
+ mstore = ctx.storage.sub_storage("metric", node_id)
+ for (source_name, sensor_name), values in data.items():
+ mstore[source_name, sensor_name] = values
+ mstore["collected_at"] = collected_at
# def delta(func, only_upd=True):
diff --git a/wally/stage.py b/wally/stage.py
index 0451d18..95542f3 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,27 +1,36 @@
-import logging
-import contextlib
-from typing import Callable, Iterator
+import abc
+from typing import Optional
-from .utils import StopTestError
from .test_run_class import TestRun
+from .config import ConfigBlock
-logger = logging.getLogger("wally")
+class StepOrder:
+ DISCOVER = 0
+ SPAWN = 10
+ CONNECT = 20
+ START_SENSORS = 30
+ TEST = 40
+ COLLECT_SENSORS = 50
+ REPORT = 60
-@contextlib.contextmanager
-def log_stage(stage) -> Iterator[None]:
- msg_templ = "Exception during {0}: {1!s}"
- msg_templ_no_exc = "During {0}"
+class Stage(metaclass=abc.ABCMeta):
+ priority = None # type: int
+ config_block = None # type: Optional[str]
- logger.info("Start " + stage.name)
+ @classmethod
+ def name(cls) -> str:
+ return cls.__name__
- try:
- yield
- except StopTestError as exc:
- logger.error(msg_templ.format(stage.__name__, exc))
- except Exception:
- logger.exception(msg_templ_no_exc.format(stage.__name__))
+ @classmethod
+ def validate_config(cls, cfg: ConfigBlock) -> None:
+ pass
+ @abc.abstractmethod
+ def run(self, ctx: TestRun) -> None:
+ pass
-StageType = Callable[[TestRun], None]
+ def cleanup(self, ctx: TestRun) -> None:
+ pass
+
diff --git a/wally/storage.py b/wally/storage.py
index 05e4259..540da88 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -4,30 +4,24 @@
import os
import abc
-from typing import Any, Iterable, TypeVar, Type, IO, Tuple, cast, List
+import array
+from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
+
+
+import yaml
+try:
+ from yaml import CLoader as Loader, CDumper as Dumper # type: ignore
+except ImportError:
+ from yaml import Loader, Dumper # type: ignore
class IStorable(metaclass=abc.ABCMeta):
"""Interface for type, which can be stored"""
- @abc.abstractmethod
- def __getstate__(self) -> Any:
- pass
- @abc.abstractmethod
- def __setstate__(self, Any):
- pass
-
-
-# all builtin types can be stored
-IStorable.register(list) # type: ignore
-IStorable.register(dict) # type: ignore
-IStorable.register(tuple) # type: ignore
-IStorable.register(set) # type: ignore
-IStorable.register(None) # type: ignore
-IStorable.register(int) # type: ignore
-IStorable.register(str) # type: ignore
-IStorable.register(bytes) # type: ignore
-IStorable.register(bool) # type: ignore
+basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
+for btype in basic_types:
+ # pylint: disable=E1101
+ IStorable.register(btype) # type: ignore
ObjClass = TypeVar('ObjClass')
@@ -54,11 +48,15 @@
pass
@abc.abstractmethod
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
pass
@abc.abstractmethod
- def get_stream(self, path: str) -> IO:
+ def get_stream(self, path: str, mode: str = "rb+") -> IO:
+ pass
+
+ @abc.abstractmethod
+ def sub_storage(self, path: str) -> 'ISimpleStorage':
pass
@@ -78,14 +76,18 @@
def __init__(self, root_path: str, existing: bool) -> None:
self.root_path = root_path
+ self.existing = existing
if existing:
if not os.path.isdir(self.root_path):
- raise ValueError("No storage found at {!r}".format(root_path))
+ raise IOError("No storage found at {!r}".format(root_path))
+
+ def j(self, path: str) -> str:
+ return os.path.join(self.root_path, path)
def __setitem__(self, path: str, value: bytes) -> None:
- path = os.path.join(self.root_path, path)
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "wb") as fd:
+ jpath = self.j(path)
+ os.makedirs(os.path.dirname(jpath), exist_ok=True)
+ with open(jpath, "wb") as fd:
fd.write(value)
def __delitem__(self, path: str) -> None:
@@ -95,32 +97,53 @@
pass
def __getitem__(self, path: str) -> bytes:
- path = os.path.join(self.root_path, path)
- with open(path, "rb") as fd:
+ with open(self.j(path), "rb") as fd:
return fd.read()
def __contains__(self, path: str) -> bool:
- path = os.path.join(self.root_path, path)
- return os.path.exists(path)
+ return os.path.exists(self.j(path))
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
- path = os.path.join(self.root_path, path)
- for entry in os.scandir(path):
+ def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
+ jpath = self.j(path)
+ for entry in os.scandir(jpath):
if not entry.name in ('..', '.'):
yield entry.is_file(), entry.name
- def get_stream(self, path: str, mode: str = "rb") -> IO:
- path = os.path.join(self.root_path, path)
- return open(path, mode)
+ def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+ jpath = self.j(path)
+
+ if "cb" == mode:
+ create_on_fail = True
+ mode = "rb+"
+ else:
+ create_on_fail = False
+
+ try:
+ fd = open(jpath, mode)
+ except IOError:
+ if not create_on_fail:
+ raise
+ fd = open(jpath, "wb")
+
+ return cast(IO[bytes], fd)
+
+ def sub_storage(self, path: str) -> 'FSStorage':
+ return self.__class__(self.j(path), self.existing)
class YAMLSerializer(ISerializer):
"""Serialize data to yaml"""
- def pack(self, value: IStorable) -> bytes:
- raise NotImplementedError()
+ def pack(self, value: Any) -> bytes:
+ if type(value) not in basic_types:
+ for name, val in value.__dict__.items():
+ if type(val) not in basic_types:
+ raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
+ " basic types accepted as attributes").format(value, name, val, type(val)))
+ value = value.__dict__
+ return yaml.dump(value, Dumper=Dumper, encoding="utf8")
def unpack(self, data: bytes) -> IStorable:
- raise NotImplementedError()
+ return yaml.load(data, Loader=Loader)
class Storage:
@@ -129,32 +152,65 @@
self.storage = storage
self.serializer = serializer
- def __setitem__(self, path: str, value: IStorable) -> None:
- self.storage[path] = self.serializer.pack(value)
+ def sub_storage(self, *path: str) -> 'Storage':
+ return self.__class__(self.storage.sub_storage("/".join(path)), self.serializer)
- def __getitem__(self, path: str) -> IStorable:
+ def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
+ self.storage[path] = self.serializer.pack(cast(IStorable, value))
+
+ def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
return self.serializer.unpack(self.storage[path])
- def __delitem__(self, path: str) -> None:
+ def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
del self.storage[path]
- def __contains__(self, path: str) -> bool:
+ def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
+ if not isinstance(path, str):
+ path = "/".join(path)
return path in self.storage
- def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+ def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
return self.storage.list("/".join(path))
- def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
+ def set_array(self, value: array.array, *path: str) -> None:
+ with self.get_stream("/".join(path), "wb") as fd:
+ value.tofile(fd) # type: ignore
+
+ def get_array(self, typecode: str, *path: str) -> array.array:
+ res = array.array(typecode)
+ path_s = "/".join(path)
+ with self.get_stream(path_s, "rb") as fd:
+ fd.seek(0, os.SEEK_END)
+ size = fd.tell()
+ fd.seek(0, os.SEEK_SET)
+ assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
+ .format(path_s, typecode)
+ res.fromfile(fd, size // res.itemsize) # type: ignore
+ return res
+
+ def append(self, value: array.array, *path: str) -> None:
+ with self.get_stream("/".join(path), "cb") as fd:
+ fd.seek(0, os.SEEK_END)
+ value.tofile(fd) # type: ignore
+
+ def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
+ "Internal function, used to construct user type from raw unpacked value"
if obj_class in (int, str, dict, list, None):
- if not isinstance(raw_val, obj_class):
- raise ValueError("Can't load path {!r} into type {}. Real type is {}"
- .format(path, obj_class, type(raw_val)))
- return cast(ObjClass, raw_val)
+ raise ValueError("Can't load into build-in value - {!r} into type {}")
if not isinstance(raw_val, dict):
raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
- if not all(isinstance(str, key) for key in raw_val.keys):
+ if not all(isinstance(key, str) for key in raw_val.keys()):
raise ValueError("Can't load path {!r} into python type.".format(path) +
"Raw not all keys in raw value is strings")
@@ -170,19 +226,25 @@
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return self.construct(path_s, self[path_s], obj_class)
+ return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
- def get_stream(self, *path: str) -> IO:
- return self.storage.get_stream("/".join(path))
+ def get_stream(self, path: str, mode: str = "r") -> IO:
+ return self.storage.get_stream(path, mode)
- def get(self, path: str, default: Any = None) -> Any:
+ def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
try:
return self[path]
- except KeyError:
+ except Exception:
return default
- def append(self, path: str, data: List):
- raise NotImplemented()
+ def __enter__(self) -> 'Storage':
+ return self
+
+ def __exit__(self, x: Any, y: Any, z: Any) -> None:
+ return
def make_storage(url: str, existing: bool = False) -> Storage:
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 583a4a0..5715046 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,10 +1,19 @@
config: Config - full configuration
-nodes: List[NodeInfo] - all nodes
-fuel_openstack_creds: OSCreds - openstack creds, discovered from fuel (or None)
+all_nodes: List[NodeInfo] - all nodes
+
+fuel:
+ version: List[int] - FUEL master node version
+ os_creds: OSCreds - openstack creds, discovered from fuel (or None)
+ nodes: List[NodeInfo] - FUEL cluster nodes
+
openstack_openrc: OSCreds - openrc used for openstack cluster
-discovered_nodes: List[NodeInfo] - list of discovered nodes
-reused_nodes: List[NodeInfo] - list of reused nodes from cluster
-spawned_vm_ids: List[int] - list of openstack VM id's, spawned for test
+
+openstack_nodes: List[NodeInfo] - list of openstack nodes
+reused_os_nodes: List[NodeInfo] - list of openstack VM, reused in test
+spawned_os_nodes: List[NodeInfo] - list of openstack VM, spawned for test
+ceph_nodes: List[NodeInfo] - list of ceph nodes
+explicit_nodes: List[NodeInfo] - list of explicit nodes
+
info/comment : str - run comment
info/run_uuid : str - run uuid
info/run_time : float - run unix time
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 0f4ebde..1b5f38e 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -83,7 +83,7 @@
node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
node.conn.mkdir(self.config.remote_dir)
except Exception as exc:
- msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
+ msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node)
logger.exception(msg)
raise StopTestError(msg) from exc
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 6d1eeee..ef69b05 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -74,7 +74,7 @@
pass
@abc.abstractmethod
- def format_for_console(cls, data: Any) -> str:
+ def format_for_console(self, data: Any) -> str:
pass
@@ -122,9 +122,9 @@
}
assert info == expected_config, \
- "Test info at path {} is not equal to expected config." + \
- "Maybe configuration was changed before test was restarted. " + \
- "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+ ("Test info at path {} is not equal to expected config." +
+ "Maybe configuration was changed before test was restarted. " +
+ "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
else:
@@ -181,10 +181,10 @@
start_times = [] # type: List[int]
stop_times = [] # type: List[int]
+ mstorage = storage.sub_storage("result", str(run_id), "measurement")
for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
for metrics_name, data in result.items():
- path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
- storage[path] = data # type: ignore
+ mstorage[node.info.node_id(), metrics_name] = data # type: ignore
start_times.append(t_start)
stop_times.append(t_stop)
@@ -214,7 +214,7 @@
'end_time': max_stop_time
}
- storage["result/{}/info".format(run_id)] = test_config # type: ignore
+ storage["result", str(run_id), "info"] = test_config # type: ignore
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index db41a46..30c46e7 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -4,7 +4,7 @@
from .timeseries import SensorDatastore
from .node_interfaces import NodeInfo, IRPCNode
-from .start_vms import OSCreds, OSConnection
+from .openstack_api import OSCreds, OSConnection
from .storage import Storage
from .config import Config
from .fuel_rest_api import Connection
@@ -24,6 +24,7 @@
# openstack credentials
self.fuel_openstack_creds = None # type: Optional[OSCreds]
+ self.fuel_version = None # type: Optional[List[int]]
self.os_creds = None # type: Optional[OSCreds]
self.os_connection = None # type: Optional[OSConnection]
self.fuel_conn = None # type: Optional[Connection]
@@ -33,6 +34,7 @@
self.config = config
self.sensors_data = SensorDatastore()
self.sensors_run_on = set() # type: Set[str]
+ self.os_spawned_nodes_ids = None # type: List[int]
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
diff --git a/wally/utils.py b/wally/utils.py
index b27b1ba..45b67b4 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -260,7 +260,7 @@
raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname: str) -> IO:
+def open_for_append_or_create(fname: str) -> IO[str]:
if not os.path.exists(fname):
return open(fname, "w")
@@ -289,18 +289,6 @@
return data
-CLEANING = [] # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
-
-
-def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
- CLEANING.append((func, list(args), kwargs))
-
-
-def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
- while CLEANING:
- yield CLEANING.pop()
-
-
def flatten(data: Iterable[Any]) -> List[Any]:
res = []
for i in data: