| # Copyright 2015 Mirantis Inc. |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| from collections import OrderedDict |
| import json |
| import random |
| import re |
| |
| from netaddr import ip |
| from tempest import config |
| import testtools |
| |
| CONF = config.CONF |
| SHARE_NETWORK_SUBNETS_MICROVERSION = '2.51' |
| SHARE_REPLICA_QUOTAS_MICROVERSION = "2.53" |
| EXPERIMENTAL = {'X-OpenStack-Manila-API-Experimental': 'True'} |
| |
| |
| def deduplicate(items): |
| """De-duplicate a list of items while preserving the order. |
| |
| It is useful when passing a list of items to ddt.data, in order |
| to remove duplicated elements which may be specified as constants. |
| """ |
| return list(OrderedDict.fromkeys(items)) |
| |
| |
| def get_microversion_as_tuple(microversion_str): |
| """Transforms string-like microversion to two-value tuple of integers. |
| |
| Tuple of integers useful for microversion comparisons. |
| """ |
| regex = r"^([1-9]\d*)\.([1-9]\d*|0)$" |
| match = re.match(regex, microversion_str) |
| if not match: |
| raise ValueError( |
| "Microversion does not fit template 'x.y' - %s" % microversion_str) |
| return int(match.group(1)), int(match.group(2)) |
| |
| |
| def is_microversion_gt(left, right): |
| """Is microversion for left is greater than the right one.""" |
| return get_microversion_as_tuple(left) > get_microversion_as_tuple(right) |
| |
| |
| def is_microversion_ge(left, right): |
| """Is microversion for left is greater than or equal to the right one.""" |
| return get_microversion_as_tuple(left) >= get_microversion_as_tuple(right) |
| |
| |
| def is_microversion_eq(left, right): |
| """Is microversion for left is equal to the right one.""" |
| return get_microversion_as_tuple(left) == get_microversion_as_tuple(right) |
| |
| |
| def is_microversion_ne(left, right): |
| """Is microversion for left is not equal to the right one.""" |
| return get_microversion_as_tuple(left) != get_microversion_as_tuple(right) |
| |
| |
| def is_microversion_le(left, right): |
| """Is microversion for left is less than or equal to the right one.""" |
| return get_microversion_as_tuple(left) <= get_microversion_as_tuple(right) |
| |
| |
| def is_microversion_lt(left, right): |
| """Is microversion for left is less than the right one.""" |
| return get_microversion_as_tuple(left) < get_microversion_as_tuple(right) |
| |
| |
| def is_microversion_supported(microversion): |
| bottom = get_microversion_as_tuple(CONF.share.min_api_microversion) |
| microversion = get_microversion_as_tuple(microversion) |
| top = get_microversion_as_tuple(CONF.share.max_api_microversion) |
| return bottom <= microversion <= top |
| |
| |
| def skip_if_microversion_not_supported(microversion): |
| """Decorator for tests that are microversion-specific.""" |
| if not is_microversion_supported(microversion): |
| reason = ("Skipped. Test requires microversion '%s'." % microversion) |
| return testtools.skip(reason) |
| return lambda f: f |
| |
| |
| def skip_if_is_microversion_ge(left, right): |
| """Skip if version for left is greater than or equal to the right one.""" |
| |
| if is_microversion_ge(left, right): |
| reason = ("Skipped. Test requires microversion " |
| "< than '%s'." % right) |
| return testtools.skip(reason) |
| return lambda f: f |
| |
| |
| def check_skip_if_microversion_not_supported(microversion): |
| """Callable method for tests that are microversion-specific.""" |
| if not is_microversion_supported(microversion): |
| reason = ("Skipped. Test requires microversion '%s'." % microversion) |
| raise testtools.TestCase.skipException(reason) |
| |
| |
| def rand_ip(network=False): |
| """This uses the TEST-NET-3 range of reserved IP addresses. |
| |
| Using this range, which are reserved solely for use in |
| documentation and example source code, should avoid any potential |
| conflicts in real-world testing. |
| """ |
| test_net_3 = '203.0.113.' |
| address = test_net_3 + str(random.randint(0, 255)) |
| if network: |
| mask_length = str(random.randint(24, 32)) |
| address = '/'.join((address, mask_length)) |
| ip_network = ip.IPNetwork(address) |
| return '/'.join((str(ip_network.network), mask_length)) |
| return address |
| |
| |
| def rand_ipv6_ip(network=False): |
| """This uses the IPv6 documentation range of 2001:DB8::/32""" |
| ran_add = ["%x" % random.randrange(0, 16 ** 4) for i in range(6)] |
| address = "2001:0DB8:" + ":".join(ran_add) |
| if network: |
| mask_length = str(random.randint(32, 128)) |
| address = '/'.join((address, mask_length)) |
| ip_network = ip.IPNetwork(address) |
| return '/'.join((str(ip_network.network), mask_length)) |
| return address |
| |
| |
| def choose_matching_backend(share, pools, share_type): |
| extra_specs = {} |
| # fix extra specs with string values instead of boolean |
| for k, v in share_type['extra_specs'].items(): |
| extra_specs[k] = (True if str(v).lower() == 'true' |
| else False if str(v).lower() == 'false' |
| else v) |
| selected_pool = next( |
| (x for x in pools if (x['name'] != share['host'] and all( |
| y in x['capabilities'].items() for y in extra_specs.items()))), |
| None) |
| |
| return selected_pool |
| |
| |
| def get_configured_extra_specs(variation=None): |
| """Retrieve essential extra specs according to configuration in tempest. |
| |
| :param variation: can assume possible values: None to be as configured in |
| tempest; 'opposite_driver_modes' for as configured in tempest but |
| inverse driver mode; 'invalid' for inverse as configured in tempest, |
| ideal for negative tests. |
| :return: dict containing essential extra specs. |
| """ |
| |
| extra_specs = {'storage_protocol': CONF.share.capability_storage_protocol} |
| |
| if variation == 'invalid': |
| extra_specs['driver_handles_share_servers'] = ( |
| not CONF.share.multitenancy_enabled) |
| extra_specs['snapshot_support'] = ( |
| not CONF.share.capability_snapshot_support) |
| |
| elif variation == 'opposite_driver_modes': |
| extra_specs['driver_handles_share_servers'] = ( |
| not CONF.share.multitenancy_enabled) |
| extra_specs['snapshot_support'] = ( |
| CONF.share.capability_snapshot_support) |
| |
| else: |
| extra_specs['driver_handles_share_servers'] = ( |
| CONF.share.multitenancy_enabled) |
| extra_specs['snapshot_support'] = ( |
| CONF.share.capability_snapshot_support) |
| extra_specs['create_share_from_snapshot_support'] = ( |
| CONF.share.capability_create_share_from_snapshot_support) |
| |
| return extra_specs |
| |
| |
| def replication_with_multitenancy_support(): |
| return (share_network_subnets_are_supported() and |
| CONF.share.multitenancy_enabled) |
| |
| |
| def skip_if_manage_not_supported_for_version( |
| version=CONF.share.max_api_microversion): |
| if (is_microversion_lt(version, "2.49") |
| and CONF.share.multitenancy_enabled): |
| raise testtools.TestCase.skipException( |
| "Share manage tests with multitenancy are disabled for " |
| "microversion < 2.49") |
| |
| |
| def share_network_subnets_are_supported(): |
| return is_microversion_supported(SHARE_NETWORK_SUBNETS_MICROVERSION) |
| |
| |
| def share_replica_quotas_are_supported(): |
| return is_microversion_supported(SHARE_REPLICA_QUOTAS_MICROVERSION) |
| |
| |
| def share_network_get_default_subnet(share_network): |
| return next(( |
| subnet for subnet in share_network.get('share_network_subnets', []) |
| if subnet['availability_zone'] is None), None) |
| |
| |
| def get_extra_headers(request_version, graduation_version): |
| headers = None |
| extra_headers = False |
| if is_microversion_lt(request_version, graduation_version): |
| headers = EXPERIMENTAL |
| extra_headers = True |
| return headers, extra_headers |
| |
| |
| def _parse_resp(body, verify_top_key=None): |
| try: |
| body = json.loads(body) |
| except ValueError: |
| return body |
| |
| # We assume, that if the first value of the deserialized body's |
| # item set is a dict or a list, that we just return the first value |
| # of deserialized body. |
| # Essentially "cutting out" the first placeholder element in a body |
| # that looks like this: |
| # |
| # { |
| # "users": [ |
| # ... |
| # ] |
| # } |
| try: |
| # Ensure there are not more than one top-level keys |
| # NOTE(freerunner): Ensure, that JSON is not nullable to |
| # to prevent StopIteration Exception |
| if not hasattr(body, "keys") or len(body.keys()) != 1: |
| return body |
| # Just return the "wrapped" element |
| first_key, first_item = tuple(body.items())[0] |
| if isinstance(first_item, (dict, list)): |
| if verify_top_key is not None: |
| assert_msg = ( |
| "The expected top level key is '%(top_key)s' but we " |
| "found '%(actual_key)s'." % { |
| 'top_key': verify_top_key, |
| 'actual_key': first_key |
| }) |
| assert verify_top_key == first_key, assert_msg |
| return first_item |
| except (ValueError, IndexError): |
| pass |
| return body |