Merge "setup.cfg: Replace dashes with underscores"
diff --git a/.zuul.yaml b/.zuul.yaml
index 87f89f0..0b120b2 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -5,15 +5,20 @@
check:
jobs:
- cinder-tempest-plugin-lvm-lio-barbican
- - cinder-tempest-plugin-lvm-lio-barbican-centos-8:
+ - cinder-tempest-plugin-lvm-lio-barbican-centos-8-stream:
voting: false
- cinder-tempest-plugin-lvm-tgt-barbican
- nova-ceph-multistore:
voting: false
- cinder-tempest-plugin-cbak-ceph
+ - cinder-tempest-plugin-cbak-s3
+ - cinder-tempest-plugin-basic-xena
+ - cinder-tempest-plugin-basic-wallaby
- cinder-tempest-plugin-basic-victoria
- cinder-tempest-plugin-basic-ussuri
- - cinder-tempest-plugin-basic-train
+ # Set this job to voting once we have some actual tests to run
+ - cinder-tempest-plugin-protection-functional:
+ voting: false
gate:
jobs:
- cinder-tempest-plugin-lvm-lio-barbican
@@ -21,9 +26,30 @@
- cinder-tempest-plugin-cbak-ceph
experimental:
jobs:
+ - cinder-tempest-plugin-cbak-ceph-xena
+ - cinder-tempest-plugin-cbak-ceph-wallaby
- cinder-tempest-plugin-cbak-ceph-victoria
- cinder-tempest-plugin-cbak-ceph-ussuri
- - cinder-tempest-plugin-cbak-ceph-train
+
+- job:
+ name: cinder-tempest-plugin-protection-functional
+ parent: devstack-tempest
+ required-projects:
+ - opendev.org/openstack/cinder-tempest-plugin
+ - opendev.org/openstack/cinder
+ vars:
+ tox_envlist: all
+ tempest_test_regex: 'cinder_tempest_plugin.rbac'
+ devstack_local_conf:
+ test-config:
+ $CINDER_CONF:
+ oslo_policy:
+ enforce_new_defaults: True
+ $TEMPEST_CONFIG:
+ enforce_scope:
+ cinder: True
+ tempest_plugins:
+ - cinder-tempest-plugin
- job:
name: cinder-tempest-plugin-lvm-barbican-base-abstract
@@ -43,7 +69,6 @@
barbican: https://opendev.org/openstack/barbican
vars:
tempest_test_regex: '(^tempest\.(api|scenario)|(^cinder_tempest_plugin))'
- tempest_test_exclude_list: '{{ ansible_user_dir }}/{{ zuul.projects["opendev.org/openstack/tempest"].src_dir }}/tools/tempest-integrated-gate-storage-exclude-list.txt'
tox_envlist: all
devstack_localrc:
CINDER_LVM_TYPE: thin
@@ -71,7 +96,7 @@
description: |
This is a base job for lvm with lio & tgt targets
with cinderlib tests.
- branches: ^(?!stable/(ocata|pike|queens|rocky|stein)).*$
+ branches: ^(?!stable/(ocata|pike|queens|rocky|stein|train)).*$
parent: cinder-tempest-plugin-lvm-barbican-base-abstract
roles:
- zuul: opendev.org/openstack/cinderlib
@@ -83,6 +108,28 @@
vars:
fetch_subunit_output_additional_dirs:
- "{{ ansible_user_dir }}/{{ zuul.projects['opendev.org/openstack/cinderlib'].src_dir }}"
+ tempest_test_exclude_list: '{{ ansible_user_dir }}/{{ zuul.projects["opendev.org/openstack/tempest"].src_dir }}/tools/tempest-integrated-gate-storage-exclude-list.txt'
+
+- job:
+ name: cinder-tempest-plugin-lvm-barbican-base
+ description: |
+ This is a base job for lvm with lio & tgt targets
+ with cinderlib tests to run on stable/train testing.
+ branches: stable/train
+ parent: cinder-tempest-plugin-lvm-barbican-base-abstract
+ roles:
+ - zuul: opendev.org/openstack/cinderlib
+ required-projects:
+ - opendev.org/openstack/cinderlib
+ - name: opendev.org/openstack/cinder-tempest-plugin
+ override-checkout: 1.3.0
+ run: playbooks/tempest-and-cinderlib-run.yaml
+ # Required to collect the tox-based logs of the cinderlib functional tests
+ post-run: playbooks/post-cinderlib.yaml
+ vars:
+ fetch_subunit_output_additional_dirs:
+ - "{{ ansible_user_dir }}/{{ zuul.projects['opendev.org/openstack/cinderlib'].src_dir }}"
+ tempest_test_exclude_list: '{{ ansible_user_dir }}/{{ zuul.projects["opendev.org/openstack/tempest"].src_dir }}/tools/tempest-integrated-gate-storage-exclude-list.txt'
- job:
name: cinder-tempest-plugin-lvm-barbican-base
@@ -90,6 +137,11 @@
This is a base job for lvm with lio & tgt targets
branches: ^(?=stable/(ocata|pike|queens|rocky|stein)).*$
parent: cinder-tempest-plugin-lvm-barbican-base-abstract
+ required-projects:
+ - name: opendev.org/openstack/cinder-tempest-plugin
+ override-checkout: stein-last
+ vars:
+ tempest_test_blacklist: '{{ ansible_user_dir }}/{{ zuul.projects["opendev.org/openstack/tempest"].src_dir }}/tools/tempest-integrated-gate-storage-blacklist.txt'
- job:
name: cinder-tempest-plugin-cbak-ceph
@@ -107,6 +159,18 @@
c-bak: true
- job:
+ name: cinder-tempest-plugin-cbak-ceph-xena
+ parent: cinder-tempest-plugin-cbak-ceph
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/xena
+
+- job:
+ name: cinder-tempest-plugin-cbak-ceph-wallaby
+ parent: cinder-tempest-plugin-cbak-ceph
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/wallaby
+
+- job:
name: cinder-tempest-plugin-cbak-ceph-victoria
parent: cinder-tempest-plugin-cbak-ceph
nodeset: openstack-single-node-focal
@@ -118,12 +182,6 @@
nodeset: openstack-single-node-bionic
override-checkout: stable/ussuri
-- job:
- name: cinder-tempest-plugin-cbak-ceph-train
- parent: cinder-tempest-plugin-cbak-ceph
- nodeset: openstack-single-node-bionic
- override-checkout: stable/train
-
# variant for pre-Ussuri branches (no volume revert for Ceph),
# should this job be used on those branches
- job:
@@ -147,9 +205,9 @@
CINDER_ISCSI_HELPER: lioadm
- job:
- name: cinder-tempest-plugin-lvm-lio-barbican-centos-8
+ name: cinder-tempest-plugin-lvm-lio-barbican-centos-8-stream
parent: cinder-tempest-plugin-lvm-lio-barbican
- nodeset: devstack-single-node-centos-8
+ nodeset: devstack-single-node-centos-8-stream
description: |
This jobs configures Cinder with LVM, LIO, barbican and
runs tempest tests and cinderlib tests on CentOS 8.
@@ -165,6 +223,21 @@
CINDER_ISCSI_HELPER: tgtadm
- job:
+ name: cinder-tempest-plugin-cbak-s3
+ parent: cinder-tempest-plugin-basic
+ description: |
+ Integration tests that runs with the s3 backup driver with
+ Swift S3 API.
+ vars:
+ devstack_localrc:
+ CINDER_BACKUP_DRIVER: 's3_swift'
+ devstack_services:
+ c-bak: true
+ s3api: true
+ # Workaround: TLS proxy seems to cause S3 signature mismatch.
+ tls-proxy: false
+ tempest_test_regex: '(test_volume_backup|test_volumes_backup|test_snapshot_backup)'
+- job:
name: cinder-tempest-plugin-basic
parent: devstack-tempest
description: |
@@ -185,6 +258,18 @@
- ^releasenotes/.*$
- job:
+ name: cinder-tempest-plugin-basic-xena
+ parent: cinder-tempest-plugin-basic
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/xena
+
+- job:
+ name: cinder-tempest-plugin-basic-wallaby
+ parent: cinder-tempest-plugin-basic
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/wallaby
+
+- job:
name: cinder-tempest-plugin-basic-victoria
parent: cinder-tempest-plugin-basic
nodeset: openstack-single-node-focal
@@ -195,12 +280,3 @@
parent: cinder-tempest-plugin-basic
nodeset: openstack-single-node-bionic
override-checkout: stable/ussuri
-
-- job:
- name: cinder-tempest-plugin-basic-train
- parent: cinder-tempest-plugin-basic
- nodeset: openstack-single-node-bionic
- override-checkout: stable/train
- vars:
- devstack_localrc:
- USE_PYTHON3: True
diff --git a/cinder_tempest_plugin/api/volume/admin/test_volume_backup.py b/cinder_tempest_plugin/api/volume/admin/test_volume_backup.py
index d1fa730..e5ded52 100644
--- a/cinder_tempest_plugin/api/volume/admin/test_volume_backup.py
+++ b/cinder_tempest_plugin/api/volume/admin/test_volume_backup.py
@@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-from tempest.common import waiters
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions
@@ -41,19 +40,10 @@
def test_backup_crossproject_admin_negative(self):
# create vol as user
- volume = self.volumes_client.create_volume(
- size=CONF.volume.volume_size)['volume']
- waiters.wait_for_volume_resource_status(
- self.volumes_client,
- volume['id'], 'available')
+ volume = self.create_volume(size=CONF.volume.volume_size)
# create backup as user
- backup = self.backups_client.create_backup(
- volume_id=volume['id'])['backup']
- waiters.wait_for_volume_resource_status(
- self.backups_client,
- backup['id'], 'available')
-
+ self.create_backup(volume_id=volume['id'])
# try to create incremental backup as admin
self.assertRaises(
exceptions.BadRequest, self.admin_backups_client.create_backup,
@@ -63,18 +53,12 @@
def test_backup_crossproject_user_negative(self):
# create vol as user
- volume = self.volumes_client.create_volume(
- size=CONF.volume.volume_size)['volume']
- waiters.wait_for_volume_resource_status(
- self.volumes_client,
- volume['id'], 'available')
+ volume = self.create_volume(size=CONF.volume.volume_size)
# create backup as admin
- backup = self.admin_backups_client.create_backup(
- volume_id=volume['id'])['backup']
- waiters.wait_for_volume_resource_status(
- self.admin_backups_client,
- backup['id'], 'available')
+
+ self.create_backup(volume_id=volume['id'],
+ backup_client=self.admin_backups_client)
# try to create incremental backup as user
self.assertRaises(
@@ -85,25 +69,14 @@
def test_incremental_backup_respective_parents(self):
# create vol as user
- volume = self.volumes_client.create_volume(
- size=CONF.volume.volume_size)['volume']
- waiters.wait_for_volume_resource_status(
- self.volumes_client,
- volume['id'], 'available')
+ volume = self.create_volume(size=CONF.volume.volume_size)
# create backup as admin
- backup_adm = self.admin_backups_client.create_backup(
- volume_id=volume['id'])['backup']
- waiters.wait_for_volume_resource_status(
- self.admin_backups_client,
- backup_adm['id'], 'available')
+ backup_adm = self.create_backup(
+ volume_id=volume['id'], backup_client=self.admin_backups_client)
# create backup as user
- backup_usr = self.backups_client.create_backup(
- volume_id=volume['id'])['backup']
- waiters.wait_for_volume_resource_status(
- self.backups_client,
- backup_usr['id'], 'available')
+ backup_usr = self.create_backup(volume_id=volume['id'])
# refresh admin backup and assert no child backups
backup_adm = self.admin_backups_client.show_backup(
@@ -111,11 +84,8 @@
self.assertFalse(backup_adm['has_dependent_backups'])
# create incremental backup as admin
- backup_adm_inc = self.admin_backups_client.create_backup(
- volume_id=volume['id'], incremental=True)['backup']
- waiters.wait_for_volume_resource_status(
- self.admin_backups_client,
- backup_adm_inc['id'], 'available')
+ self.create_backup(volume_id=volume['id'], incremental=True,
+ backup_client=self.admin_backups_client)
# refresh user backup and assert no child backups
backup_usr = self.backups_client.show_backup(
@@ -128,11 +98,8 @@
self.assertTrue(backup_adm['has_dependent_backups'])
# create incremental backup as user
- backup_usr_inc = self.backups_client.create_backup(
- volume_id=volume['id'], incremental=True)['backup']
- waiters.wait_for_volume_resource_status(
- self.backups_client,
- backup_usr_inc['id'], 'available')
+ self.create_backup(volume_id=volume['id'],
+ incremental=True)
# refresh user backup and assert it has childs
backup_usr = self.backups_client.show_backup(
diff --git a/cinder_tempest_plugin/api/volume/test_volume_revert.py b/cinder_tempest_plugin/api/volume/test_volume_revert.py
index 7c5eed1..bf3d806 100644
--- a/cinder_tempest_plugin/api/volume/test_volume_revert.py
+++ b/cinder_tempest_plugin/api/volume/test_volume_revert.py
@@ -80,4 +80,4 @@
# Destination volume smaller than source, API should block that
self.assertRaises(exceptions.BadRequest,
self.volume_revert_client.revert_to_snapshot,
- self.volume, self.snapshot)
+ self.volume, self.snapshot['id'])
diff --git a/cinder_tempest_plugin/api/volume/test_volume_unicode.py b/cinder_tempest_plugin/api/volume/test_volume_unicode.py
index ff6473a..026271b 100644
--- a/cinder_tempest_plugin/api/volume/test_volume_unicode.py
+++ b/cinder_tempest_plugin/api/volume/test_volume_unicode.py
@@ -72,13 +72,13 @@
@decorators.idempotent_id('332be44d-5418-4fb3-a8f0-a3587de6929f')
def test_snapshot_create_volume_description_non_ascii_code(self):
# Create a volume with non-ascii description
- description = u'\u05e7\u05d9\u05d9\u05e4\u05e9'
+ description = '\u05e7\u05d9\u05d9\u05e4\u05e9'
volume = self.create_volume(description=description)
vol_info = self.volumes_client.show_volume(volume['id'])['volume']
self.assertEqual(description, vol_info['description'])
# Create a snapshot with different non-ascii description
- description = u'\u4e2d\u56fd\u793e\u533a'
+ description = '\u4e2d\u56fd\u793e\u533a'
snapshot = self.create_snapshot(volume['id'], description=description)
snapshot_info = self.snapshots_client.show_snapshot(
snapshot['id'])['snapshot']
diff --git a/cinder_tempest_plugin/rbac/__init__.py b/cinder_tempest_plugin/rbac/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cinder_tempest_plugin/rbac/__init__.py
diff --git a/cinder_tempest_plugin/rbac/v3/__init__.py b/cinder_tempest_plugin/rbac/v3/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cinder_tempest_plugin/rbac/v3/__init__.py
diff --git a/cinder_tempest_plugin/rbac/v3/base.py b/cinder_tempest_plugin/rbac/v3/base.py
new file mode 100644
index 0000000..d1a11e5
--- /dev/null
+++ b/cinder_tempest_plugin/rbac/v3/base.py
@@ -0,0 +1,42 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import config
+
+CONF = config.CONF
+
+
+class VolumeV3RbacBaseTests(object):
+
+ identity_version = 'v3'
+
+ @classmethod
+ def skip_checks(cls):
+ super(VolumeV3RbacBaseTests, cls).skip_checks()
+ if not CONF.enforce_scope.cinder:
+ raise cls.skipException(
+ "Tempest is not configured to enforce_scope for cinder, "
+ "skipping RBAC tests. To enable these tests set "
+ "`tempest.conf [enforce_scope] cinder=True`."
+ )
+
+ def do_request(self, method, expected_status=200, client=None, **payload):
+ if not client:
+ client = self.client
+ if isinstance(expected_status, type(Exception)):
+ self.assertRaises(expected_status,
+ getattr(client, method),
+ **payload)
+ else:
+ response = getattr(client, method)(**payload)
+ self.assertEqual(response.response.status, expected_status)
+ return response
diff --git a/cinder_tempest_plugin/rbac/v3/test_capabilities.py b/cinder_tempest_plugin/rbac/v3/test_capabilities.py
new file mode 100644
index 0000000..1fa542d
--- /dev/null
+++ b/cinder_tempest_plugin/rbac/v3/test_capabilities.py
@@ -0,0 +1,80 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+from tempest.lib import exceptions
+
+from cinder_tempest_plugin.api.volume import base
+from cinder_tempest_plugin.rbac.v3 import base as rbac_base
+
+
+class VolumeV3RbacCapabilityTests(rbac_base.VolumeV3RbacBaseTests,
+ metaclass=abc.ABCMeta):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.persona = getattr(cls, 'os_%s' % cls.credentials[0])
+ cls.client = cls.persona.volume_capabilities_client_latest
+ # NOTE(lbragstad): This admin_client will be more useful later when
+ # cinder supports system-scope and we need it for administrative
+ # operations. For now, keep os_project_admin as the admin client until
+ # we have system-scope.
+ admin_client = cls.os_project_admin
+ cls.admin_capabilities_client = (
+ admin_client.volume_capabilities_client_latest)
+ cls.admin_stats_client = (
+ admin_client.volume_scheduler_stats_client_latest)
+
+ @classmethod
+ def setup_credentials(cls):
+ super().setup_credentials()
+ cls.os_primary = getattr(cls, 'os_%s' % cls.credentials[0])
+
+ @abc.abstractmethod
+ def test_get_capabilities(self):
+ """Test volume_extension:capabilities policy.
+
+ This test must check:
+ * whether the persona can fetch capabilities for a host.
+
+ """
+ pass
+
+
+class ProjectAdminTests(VolumeV3RbacCapabilityTests, base.BaseVolumeTest):
+
+ credentials = ['project_admin', 'system_admin']
+
+ def test_get_capabilities(self):
+ pools = self.admin_stats_client.list_pools()['pools']
+ host_name = pools[0]['name']
+ self.do_request('show_backend_capabilities', expected_status=200,
+ host=host_name)
+
+
+class ProjectMemberTests(ProjectAdminTests, base.BaseVolumeTest):
+
+ credentials = ['project_member', 'project_admin', 'system_admin']
+
+ def test_get_capabilities(self):
+ pools = self.admin_stats_client.list_pools()['pools']
+ host_name = pools[0]['name']
+ self.do_request('show_backend_capabilities',
+ expected_status=exceptions.Forbidden,
+ host=host_name)
+
+
+class ProjectReaderTests(ProjectMemberTests, base.BaseVolumeTest):
+
+ credentials = ['project_reader', 'project_admin', 'system_admin']
diff --git a/cinder_tempest_plugin/scenario/manager.py b/cinder_tempest_plugin/scenario/manager.py
index 70c25ae..3b25bb1 100644
--- a/cinder_tempest_plugin/scenario/manager.py
+++ b/cinder_tempest_plugin/scenario/manager.py
@@ -1,6 +1,4 @@
-# TODO: Remove this file when tempest scenario manager becomes stable
-# Copyright 2012 OpenStack Foundation
-# Copyright 2013 IBM Corp.
+# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -15,372 +13,134 @@
# License for the specific language governing permissions and limitations
# under the License.
-import netaddr
from oslo_log import log
-from oslo_serialization import jsonutils as json
-from oslo_utils import netutils
-from tempest.common import compute
-from tempest.common import image as common_image
-from tempest.common.utils.linux import remote_client
from tempest.common import waiters
from tempest import config
-from tempest import exceptions
-from tempest.lib.common import api_microversion_fixture
-from tempest.lib.common import api_version_utils
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
-import tempest.test
+
+from tempest.scenario import manager
CONF = config.CONF
LOG = log.getLogger(__name__)
-LATEST_MICROVERSION = 'latest'
+class ScenarioTest(manager.ScenarioTest):
-class ScenarioTest(tempest.test.BaseTestCase):
- """Base class for scenario tests. Uses tempest own clients. """
-
- credentials = ['primary']
-
- compute_min_microversion = None
- compute_max_microversion = LATEST_MICROVERSION
- volume_min_microversion = None
- volume_max_microversion = LATEST_MICROVERSION
- placement_min_microversion = None
- placement_max_microversion = LATEST_MICROVERSION
-
- @classmethod
- def skip_checks(cls):
- super(ScenarioTest, cls).skip_checks()
- api_version_utils.check_skip_with_microversion(
- cls.compute_min_microversion, cls.compute_max_microversion,
- CONF.compute.min_microversion, CONF.compute.max_microversion)
- api_version_utils.check_skip_with_microversion(
- cls.volume_min_microversion, cls.volume_max_microversion,
- CONF.volume.min_microversion, CONF.volume.max_microversion)
- api_version_utils.check_skip_with_microversion(
- cls.placement_min_microversion, cls.placement_max_microversion,
- CONF.placement.min_microversion, CONF.placement.max_microversion)
-
- @classmethod
- def resource_setup(cls):
- super(ScenarioTest, cls).resource_setup()
- cls.compute_request_microversion = (
- api_version_utils.select_request_microversion(
- cls.compute_min_microversion,
- CONF.compute.min_microversion))
- cls.volume_request_microversion = (
- api_version_utils.select_request_microversion(
- cls.volume_min_microversion,
- CONF.volume.min_microversion))
- cls.placement_request_microversion = (
- api_version_utils.select_request_microversion(
- cls.placement_min_microversion,
- CONF.placement.min_microversion))
-
- def setUp(self):
- super(ScenarioTest, self).setUp()
- self.useFixture(api_microversion_fixture.APIMicroversionFixture(
- compute_microversion=self.compute_request_microversion,
- volume_microversion=self.volume_request_microversion,
- placement_microversion=self.placement_request_microversion))
+ credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(ScenarioTest, cls).setup_clients()
- # Clients (in alphabetical order)
- cls.flavors_client = cls.os_primary.flavors_client
- cls.compute_floating_ips_client = (
- cls.os_primary.compute_floating_ips_client)
- if CONF.service_available.glance:
- # Check if glance v1 is available to determine which client to use.
- if CONF.image_feature_enabled.api_v1:
- cls.image_client = cls.os_primary.image_client
- elif CONF.image_feature_enabled.api_v2:
- cls.image_client = cls.os_primary.image_client_v2
- else:
- raise lib_exc.InvalidConfiguration(
- 'Either api_v1 or api_v2 must be True in '
- '[image-feature-enabled].')
- # Compute image client
- cls.compute_images_client = cls.os_primary.compute_images_client
- cls.keypairs_client = cls.os_primary.keypairs_client
- # Nova security groups client
- cls.compute_security_groups_client = (
- cls.os_primary.compute_security_groups_client)
- cls.compute_security_group_rules_client = (
- cls.os_primary.compute_security_group_rules_client)
- cls.servers_client = cls.os_primary.servers_client
- cls.interface_client = cls.os_primary.interfaces_client
- # Neutron network client
- cls.networks_client = cls.os_primary.networks_client
- cls.ports_client = cls.os_primary.ports_client
- cls.routers_client = cls.os_primary.routers_client
- cls.subnets_client = cls.os_primary.subnets_client
- cls.floating_ips_client = cls.os_primary.floating_ips_client
- cls.security_groups_client = cls.os_primary.security_groups_client
- cls.security_group_rules_client = (
- cls.os_primary.security_group_rules_client)
- # Use the latest available volume clients
- if CONF.service_available.cinder:
- cls.volumes_client = cls.os_primary.volumes_client_latest
- cls.snapshots_client = cls.os_primary.snapshots_client_latest
- cls.backups_client = cls.os_primary.backups_client_latest
+ cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest
- # ## Test functions library
- #
- # The create_[resource] functions only return body and discard the
- # resp part which is not used in scenario tests
+ def _attached_volume_name(
+ self, disks_list_before_attach, ip_address, private_key):
+ ssh = self.get_remote_client(ip_address, private_key=private_key)
- def create_keypair(self, client=None):
- if not client:
- client = self.keypairs_client
- name = data_utils.rand_name(self.__class__.__name__)
- # We don't need to create a keypair by pubkey in scenario
- body = client.create_keypair(name=name)
- self.addCleanup(client.delete_keypair, name)
- return body['keypair']
+ def _wait_for_volume_available_on_system():
+ disks_list_after_attach = ssh.list_disks()
+ return len(disks_list_after_attach) > len(disks_list_before_attach)
- def create_server(self, name=None, image_id=None, flavor=None,
- validatable=False, wait_until='ACTIVE',
- clients=None, **kwargs):
- """Wrapper utility that returns a test server.
+ if not test_utils.call_until_true(_wait_for_volume_available_on_system,
+ CONF.compute.build_timeout,
+ CONF.compute.build_interval):
+ raise lib_exc.TimeoutException
- This wrapper utility calls the common create test server and
- returns a test server. The purpose of this wrapper is to minimize
- the impact on the code of the tests already using this
- function.
+ disks_list_after_attach = ssh.list_disks()
+ volume_name = [item for item in disks_list_after_attach
+ if item not in disks_list_before_attach][0]
+ return volume_name
- :param **kwargs:
- See extra parameters below
+ def _get_file_md5(self, ip_address, filename, dev_name=None,
+ mount_path='/mnt', private_key=None, server=None):
- :Keyword Arguments:
- * *vnic_type* (``string``) --
- used when launching instances with pre-configured ports.
- Examples:
- normal: a traditional virtual port that is either attached
- to a linux bridge or an openvswitch bridge on a
- compute node.
- direct: an SR-IOV port that is directly attached to a VM
- macvtap: an SR-IOV port that is attached to a VM via a macvtap
- device.
- Defaults to ``CONF.network.port_vnic_type``.
- * *port_profile* (``dict``) --
- This attribute is a dictionary that can be used (with admin
- credentials) to supply information influencing the binding of
- the port.
- example: port_profile = "capabilities:[switchdev]"
- Defaults to ``CONF.network.port_profile``.
- """
+ ssh_client = self.get_remote_client(ip_address,
+ private_key=private_key,
+ server=server)
+ if dev_name is not None:
+ ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
+ mount_path))
- # NOTE(jlanoux): As a first step, ssh checks in the scenario
- # tests need to be run regardless of the run_validation and
- # validatable parameters and thus until the ssh validation job
- # becomes voting in CI. The test resources management and IP
- # association are taken care of in the scenario tests.
- # Therefore, the validatable parameter is set to false in all
- # those tests. In this way create_server just return a standard
- # server and the scenario tests always perform ssh checks.
+ md5_sum = ssh_client.exec_command(
+ 'sudo md5sum %s/%s|cut -c 1-32' % (mount_path, filename))
+ if dev_name is not None:
+ ssh_client.exec_command('sudo umount %s' % mount_path)
+ return md5_sum
- # Needed for the cross_tenant_traffic test:
- if clients is None:
- clients = self.os_primary
+ def _count_files(self, ip_address, dev_name=None, mount_path='/mnt',
+ private_key=None, server=None):
+ ssh_client = self.get_remote_client(ip_address,
+ private_key=private_key,
+ server=server)
+ if dev_name is not None:
+ ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
+ mount_path))
+ count = ssh_client.exec_command('sudo ls -l %s | wc -l' % mount_path)
+ if dev_name is not None:
+ ssh_client.exec_command('sudo umount %s' % mount_path)
+ # We subtract 2 from the count since `wc -l` also includes the count
+ # of new line character and while creating the filesystem, a
+ # lost+found folder is also created
+ return int(count) - 2
- if name is None:
- name = data_utils.rand_name(self.__class__.__name__ + "-server")
+ def _make_fs(self, ip_address, private_key, server, dev_name, fs='ext4'):
+ ssh_client = self.get_remote_client(ip_address,
+ private_key=private_key,
+ server=server)
- vnic_type = kwargs.pop('vnic_type', CONF.network.port_vnic_type)
- profile = kwargs.pop('port_profile', CONF.network.port_profile)
+ ssh_client.make_fs(dev_name, fs=fs)
- # If vnic_type or profile are configured create port for
- # every network
- if vnic_type or profile:
- ports = []
- create_port_body = {}
+ def create_md5_new_file(self, ip_address, filename, dev_name=None,
+ mount_path='/mnt', private_key=None, server=None):
+ ssh_client = self.get_remote_client(ip_address,
+ private_key=private_key,
+ server=server)
- if vnic_type:
- create_port_body['binding:vnic_type'] = vnic_type
+ if dev_name is not None:
+ ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
+ mount_path))
+ ssh_client.exec_command(
+ 'sudo dd bs=1024 count=100 if=/dev/urandom of=/%s/%s' %
+ (mount_path, filename))
+ md5 = ssh_client.exec_command(
+ 'sudo md5sum -b %s/%s|cut -c 1-32' % (mount_path, filename))
+ ssh_client.exec_command('sudo sync')
+ if dev_name is not None:
+ ssh_client.exec_command('sudo umount %s' % mount_path)
+ return md5
- if profile:
- create_port_body['binding:profile'] = profile
+ def get_md5_from_file(self, instance, instance_ip, filename,
+ dev_name=None):
- if kwargs:
- # Convert security group names to security group ids
- # to pass to create_port
- if 'security_groups' in kwargs:
- security_groups = \
- clients.security_groups_client.list_security_groups(
- ).get('security_groups')
- sec_dict = dict([(s['name'], s['id'])
- for s in security_groups])
+ md5_sum = self._get_file_md5(instance_ip, filename=filename,
+ dev_name=dev_name,
+ private_key=self.keypair['private_key'],
+ server=instance)
+ count = self._count_files(instance_ip, dev_name=dev_name,
+ private_key=self.keypair['private_key'],
+ server=instance)
+ return count, md5_sum
- sec_groups_names = [s['name'] for s in kwargs.pop(
- 'security_groups')]
- security_groups_ids = [sec_dict[s]
- for s in sec_groups_names]
+ def _attach_and_get_volume_device_name(self, server, volume, instance_ip,
+ private_key):
+ ssh_client = self.get_remote_client(
+ instance_ip, private_key=private_key,
+ server=server)
+ # List disks before volume attachment
+ disks_list_before_attach = ssh_client.list_disks()
+ # Attach volume
+ attachment = self.attach_volume(server, volume)
+ # Find the difference between disks before and after attachment that
+ # gives us the volume device name
+ volume_device_name = self._attached_volume_name(
+ disks_list_before_attach, instance_ip, private_key)
+ return volume_device_name, attachment
- if security_groups_ids:
- create_port_body[
- 'security_groups'] = security_groups_ids
- networks = kwargs.pop('networks', [])
- else:
- networks = []
-
- # If there are no networks passed to us we look up
- # for the project's private networks and create a port.
- # The same behaviour as we would expect when passing
- # the call to the clients with no networks
- if not networks:
- networks = clients.networks_client.list_networks(
- **{'router:external': False, 'fields': 'id'})['networks']
-
- # It's net['uuid'] if networks come from kwargs
- # and net['id'] if they come from
- # clients.networks_client.list_networks
- for net in networks:
- net_id = net.get('uuid', net.get('id'))
- if 'port' not in net:
- port = self.create_port(network_id=net_id,
- client=clients.ports_client,
- **create_port_body)
- ports.append({'port': port['id']})
- else:
- ports.append({'port': net['port']})
- if ports:
- kwargs['networks'] = ports
- self.ports = ports
-
- tenant_network = self.get_tenant_network()
-
- if CONF.compute.compute_volume_common_az:
- kwargs.setdefault('availability_zone',
- CONF.compute.compute_volume_common_az)
-
- body, _ = compute.create_test_server(
- clients,
- tenant_network=tenant_network,
- wait_until=wait_until,
- name=name, flavor=flavor,
- image_id=image_id, **kwargs)
-
- self.addCleanup(waiters.wait_for_server_termination,
- clients.servers_client, body['id'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- clients.servers_client.delete_server, body['id'])
- server = clients.servers_client.show_server(body['id'])['server']
- return server
-
- def create_volume(self, size=None, name=None, snapshot_id=None,
- imageRef=None, volume_type=None):
- if size is None:
- size = CONF.volume.volume_size
- if imageRef:
- if CONF.image_feature_enabled.api_v1:
- resp = self.image_client.check_image(imageRef)
- image = common_image.get_image_meta_from_headers(resp)
- else:
- image = self.image_client.show_image(imageRef)
- min_disk = image.get('min_disk')
- size = max(size, min_disk)
- if name is None:
- name = data_utils.rand_name(self.__class__.__name__ + "-volume")
- kwargs = {'display_name': name,
- 'snapshot_id': snapshot_id,
- 'imageRef': imageRef,
- 'volume_type': volume_type,
- 'size': size}
-
- if CONF.compute.compute_volume_common_az:
- kwargs.setdefault('availability_zone',
- CONF.compute.compute_volume_common_az)
-
- volume = self.volumes_client.create_volume(**kwargs)['volume']
-
- self.addCleanup(self.volumes_client.wait_for_resource_deletion,
- volume['id'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self.volumes_client.delete_volume, volume['id'])
- self.assertEqual(name, volume['name'])
- waiters.wait_for_volume_resource_status(self.volumes_client,
- volume['id'], 'available')
- # The volume retrieved on creation has a non-up-to-date status.
- # Retrieval after it becomes active ensures correct details.
- volume = self.volumes_client.show_volume(volume['id'])['volume']
- return volume
-
- def create_backup(self, volume_id, name=None, description=None,
- force=False, snapshot_id=None, incremental=False,
- container=None):
-
- name = name or data_utils.rand_name(
- self.__class__.__name__ + "-backup")
- kwargs = {'name': name,
- 'description': description,
- 'force': force,
- 'snapshot_id': snapshot_id,
- 'incremental': incremental,
- 'container': container}
- backup = self.backups_client.create_backup(volume_id=volume_id,
- **kwargs)['backup']
- self.addCleanup(self.backups_client.delete_backup, backup['id'])
- waiters.wait_for_volume_resource_status(self.backups_client,
- backup['id'], 'available')
- return backup
-
- def restore_backup(self, backup_id):
- restore = self.backups_client.restore_backup(backup_id)['restore']
- self.addCleanup(self.volumes_client.delete_volume,
- restore['volume_id'])
- waiters.wait_for_volume_resource_status(self.backups_client,
- backup_id, 'available')
- waiters.wait_for_volume_resource_status(self.volumes_client,
- restore['volume_id'],
- 'available')
- self.assertEqual(backup_id, restore['backup_id'])
- return restore
-
- def create_volume_snapshot(self, volume_id, name=None, description=None,
- metadata=None, force=False):
- name = name or data_utils.rand_name(
- self.__class__.__name__ + '-snapshot')
- snapshot = self.snapshots_client.create_snapshot(
- volume_id=volume_id,
- force=force,
- display_name=name,
- description=description,
- metadata=metadata)['snapshot']
- self.addCleanup(self.snapshots_client.wait_for_resource_deletion,
- snapshot['id'])
- self.addCleanup(self.snapshots_client.delete_snapshot, snapshot['id'])
- waiters.wait_for_volume_resource_status(self.snapshots_client,
- snapshot['id'], 'available')
- snapshot = self.snapshots_client.show_snapshot(
- snapshot['id'])['snapshot']
- return snapshot
-
- def _cleanup_volume_type(self, volume_type):
- """Clean up a given volume type.
-
- Ensuring all volumes associated to a type are first removed before
- attempting to remove the type itself. This includes any image volume
- cache volumes stored in a separate tenant to the original volumes
- created from the type.
- """
- admin_volume_type_client = self.os_admin.volume_types_client_latest
- admin_volumes_client = self.os_admin.volumes_client_latest
- volumes = admin_volumes_client.list_volumes(
- detail=True, params={'all_tenants': 1})['volumes']
- type_name = volume_type['name']
- for volume in [v for v in volumes if v['volume_type'] == type_name]:
- test_utils.call_and_ignore_notfound_exc(
- admin_volumes_client.delete_volume, volume['id'])
- admin_volumes_client.wait_for_resource_deletion(volume['id'])
- admin_volume_type_client.delete_volume_type(volume_type['id'])
-
- def create_volume_type(self, client=None, name=None, backend_name=None):
+ def create_volume_type(self, client=None, name=None, extra_specs=None):
if not client:
client = self.os_admin.volume_types_client_latest
if not name:
@@ -388,719 +148,67 @@
name = data_utils.rand_name(class_name + '-volume-type')
randomized_name = data_utils.rand_name('scenario-type-' + name)
- LOG.debug("Creating a volume type: %s on backend %s",
- randomized_name, backend_name)
- extra_specs = {}
- if backend_name:
- extra_specs = {"volume_backend_name": backend_name}
-
- volume_type = client.create_volume_type(
+ LOG.debug("Creating a volume type: %s with extra_specs %s",
+ randomized_name, extra_specs)
+ if extra_specs is None:
+ extra_specs = {}
+ volume_type = self.admin_volume_types_client.create_volume_type(
name=randomized_name, extra_specs=extra_specs)['volume_type']
- self.addCleanup(self._cleanup_volume_type, volume_type)
+ self.addCleanup(self.cleanup_volume_type, volume_type)
return volume_type
- def _create_loginable_secgroup_rule(self, secgroup_id=None):
- _client = self.compute_security_groups_client
- _client_rules = self.compute_security_group_rules_client
- if secgroup_id is None:
- sgs = _client.list_security_groups()['security_groups']
- for sg in sgs:
- if sg['name'] == 'default':
- secgroup_id = sg['id']
+ def attach_volume(self, server, volume, device=None, tag=None):
+ """Attaches volume to server and waits for 'in-use' volume status.
- # These rules are intended to permit inbound ssh and icmp
- # traffic from all sources, so no group_id is provided.
- # Setting a group_id would only permit traffic from ports
- # belonging to the same security group.
- rulesets = [
- {
- # ssh
- 'ip_protocol': 'tcp',
- 'from_port': 22,
- 'to_port': 22,
- 'cidr': '0.0.0.0/0',
- },
- {
- # ping
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'cidr': '0.0.0.0/0',
- }
- ]
- rules = list()
- for ruleset in rulesets:
- sg_rule = _client_rules.create_security_group_rule(
- parent_group_id=secgroup_id, **ruleset)['security_group_rule']
- rules.append(sg_rule)
- return rules
+ The volume will be detached when the test tears down.
- def _create_security_group(self):
- # Create security group
- sg_name = data_utils.rand_name(self.__class__.__name__)
- sg_desc = sg_name + " description"
- secgroup = self.compute_security_groups_client.create_security_group(
- name=sg_name, description=sg_desc)['security_group']
- self.assertEqual(secgroup['name'], sg_name)
- self.assertEqual(secgroup['description'], sg_desc)
- self.addCleanup(
- test_utils.call_and_ignore_notfound_exc,
- self.compute_security_groups_client.delete_security_group,
- secgroup['id'])
-
- # Add rules to the security group
- self._create_loginable_secgroup_rule(secgroup['id'])
-
- return secgroup
-
- def get_remote_client(self, ip_address, username=None, private_key=None,
- server=None):
- """Get a SSH client to a remote server
-
- :param ip_address: the server floating or fixed IP address to use
- for ssh validation
- :param username: name of the Linux account on the remote server
- :param private_key: the SSH private key to use
- :param server: server dict, used for debugging purposes
- :return: a RemoteClient object
+ :param server: The server to which the volume will be attached.
+ :param volume: The volume to attach.
+ :param device: Optional mountpoint for the attached volume. Note that
+ this is not guaranteed for all hypervisors and is not recommended.
+ :param tag: Optional device role tag to apply to the volume.
"""
+ attach_kwargs = dict(volumeId=volume['id'])
+ if device:
+ attach_kwargs['device'] = device
+ if tag:
+ attach_kwargs['tag'] = tag
- if username is None:
- username = CONF.validation.image_ssh_user
- # Set this with 'keypair' or others to log in with keypair or
- # username/password.
- if CONF.validation.auth_method == 'keypair':
- password = None
- if private_key is None:
- private_key = self.keypair['private_key']
+ attachment = self.servers_client.attach_volume(
+ server['id'], **attach_kwargs)['volumeAttachment']
+ # On teardown detach the volume and for multiattach volumes wait for
+ # the attachment to be removed. For non-multiattach volumes wait for
+ # the state of the volume to change to available. This is so we don't
+ # error out when trying to delete the volume during teardown.
+ if volume['multiattach']:
+ att = waiters.wait_for_volume_attachment_create(
+ self.volumes_client, volume['id'], server['id'])
+ self.addCleanup(waiters.wait_for_volume_attachment_remove,
+ self.volumes_client, volume['id'],
+ att['attachment_id'])
else:
- password = CONF.validation.image_ssh_password
- private_key = None
- linux_client = remote_client.RemoteClient(
- ip_address, username, pkey=private_key, password=password,
- server=server, servers_client=self.servers_client)
- linux_client.validate_authentication()
- return linux_client
+ self.addCleanup(waiters.wait_for_volume_resource_status,
+ self.volumes_client, volume['id'], 'available')
+ waiters.wait_for_volume_resource_status(self.volumes_client,
+ volume['id'], 'in-use')
+ # Ignore 404s on detach in case the server is deleted or the volume
+ # is already detached.
+ self.addCleanup(self._detach_volume, server, volume)
+ return attachment
- def _log_net_info(self, exc):
- # network debug is called as part of ssh init
- if not isinstance(exc, lib_exc.SSHTimeout):
- LOG.debug('Network information on a devstack host')
+ def _detach_volume(self, server, volume):
+ """Helper method to detach a volume.
- def create_server_snapshot(self, server, name=None):
- # Glance client
- _image_client = self.image_client
- # Compute client
- _images_client = self.compute_images_client
- if name is None:
- name = data_utils.rand_name(self.__class__.__name__ + 'snapshot')
- LOG.debug("Creating a snapshot image for server: %s", server['name'])
- image = _images_client.create_image(server['id'], name=name)
- image_id = image.response['location'].split('images/')[1]
- waiters.wait_for_image_status(_image_client, image_id, 'active')
-
- self.addCleanup(_image_client.wait_for_resource_deletion,
- image_id)
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- _image_client.delete_image, image_id)
-
- if CONF.image_feature_enabled.api_v1:
- # In glance v1 the additional properties are stored in the headers.
- resp = _image_client.check_image(image_id)
- snapshot_image = common_image.get_image_meta_from_headers(resp)
- image_props = snapshot_image.get('properties', {})
- else:
- # In glance v2 the additional properties are flattened.
- snapshot_image = _image_client.show_image(image_id)
- image_props = snapshot_image
-
- bdm = image_props.get('block_device_mapping')
- if bdm:
- bdm = json.loads(bdm)
- if bdm and 'snapshot_id' in bdm[0]:
- snapshot_id = bdm[0]['snapshot_id']
- self.addCleanup(
- self.snapshots_client.wait_for_resource_deletion,
- snapshot_id)
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self.snapshots_client.delete_snapshot,
- snapshot_id)
- waiters.wait_for_volume_resource_status(self.snapshots_client,
- snapshot_id,
- 'available')
- image_name = snapshot_image['name']
- self.assertEqual(name, image_name)
- LOG.debug("Created snapshot image %s for server %s",
- image_name, server['name'])
- return snapshot_image
-
- def nova_volume_attach(self, server, volume_to_attach):
- volume = self.servers_client.attach_volume(
- server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
- % CONF.compute.volume_device_name)['volumeAttachment']
- self.assertEqual(volume_to_attach['id'], volume['id'])
- waiters.wait_for_volume_resource_status(self.volumes_client,
- volume['id'], 'in-use')
-
- # Return the updated volume after the attachment
- return self.volumes_client.show_volume(volume['id'])['volume']
-
- def nova_volume_detach(self, server, volume):
- self.servers_client.detach_volume(server['id'], volume['id'])
- waiters.wait_for_volume_resource_status(self.volumes_client,
- volume['id'], 'available')
-
- def check_vm_connectivity(self, ip_address,
- username=None,
- private_key=None,
- should_connect=True,
- extra_msg="",
- server=None,
- mtu=None):
- """Check server connectivity
-
- :param ip_address: server to test against
- :param username: server's ssh username
- :param private_key: server's ssh private key to be used
- :param should_connect: True/False indicates positive/negative test
- positive - attempt ping and ssh
- negative - attempt ping and fail if succeed
- :param extra_msg: Message to help with debugging if ``ping_ip_address``
- fails
- :param server: The server whose console to log for debugging
- :param mtu: network MTU to use for connectivity validation
-
- :raises: AssertError if the result of the connectivity check does
- not match the value of the should_connect param
+ Ignores 404 responses if the volume or server do not exist, or the
+ volume is already detached from the server.
"""
- LOG.debug('checking network connections to IP %s with user: %s',
- ip_address, username)
- if should_connect:
- msg = "Timed out waiting for %s to become reachable" % ip_address
- else:
- msg = "ip address %s is reachable" % ip_address
- if extra_msg:
- msg = "%s\n%s" % (extra_msg, msg)
- self.assertTrue(self.ping_ip_address(ip_address,
- should_succeed=should_connect,
- mtu=mtu, server=server),
- msg=msg)
- if should_connect:
- # no need to check ssh for negative connectivity
- try:
- self.get_remote_client(ip_address, username, private_key,
- server=server)
- except Exception:
- if not extra_msg:
- extra_msg = 'Failed to ssh to %s' % ip_address
- LOG.exception(extra_msg)
- raise
-
- def create_floating_ip(self, thing, pool_name=None):
- """Create a floating IP and associates to a server on Nova"""
-
- if not pool_name:
- pool_name = CONF.network.floating_network_name
- floating_ip = (self.compute_floating_ips_client.
- create_floating_ip(pool=pool_name)['floating_ip'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self.compute_floating_ips_client.delete_floating_ip,
- floating_ip['id'])
- self.compute_floating_ips_client.associate_floating_ip_to_server(
- floating_ip['ip'], thing['id'])
- return floating_ip
-
- def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
- private_key=None, server=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key,
- server=server)
- if dev_name is not None:
- ssh_client.make_fs(dev_name)
- ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
- mount_path))
- cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
- ssh_client.exec_command(cmd_timestamp)
- timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
- % mount_path)
- if dev_name is not None:
- ssh_client.exec_command('sudo umount %s' % mount_path)
- return timestamp
-
- def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
- private_key=None, server=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key,
- server=server)
- if dev_name is not None:
- ssh_client.mount(dev_name, mount_path)
- timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
- % mount_path)
- if dev_name is not None:
- ssh_client.exec_command('sudo umount %s' % mount_path)
- return timestamp
-
- def get_server_ip(self, server):
- """Get the server fixed or floating IP.
-
- Based on the configuration we're in, return a correct ip
- address for validating that a guest is up.
- """
- if CONF.validation.connect_method == 'floating':
- # The tests calling this method don't have a floating IP
- # and can't make use of the validation resources. So the
- # method is creating the floating IP there.
- return self.create_floating_ip(server)['ip']
- elif CONF.validation.connect_method == 'fixed':
- # Determine the network name to look for based on config or creds
- # provider network resources.
- if CONF.validation.network_for_ssh:
- addresses = server['addresses'][
- CONF.validation.network_for_ssh]
- else:
- network = self.get_tenant_network()
- addresses = (server['addresses'][network['name']]
- if network else [])
- for address in addresses:
- if (address['version'] == CONF.validation.ip_version_for_ssh and # noqa
- address['OS-EXT-IPS:type'] == 'fixed'):
- return address['addr']
- raise exceptions.ServerUnreachable(server_id=server['id'])
- else:
- raise lib_exc.InvalidConfiguration()
-
- @classmethod
- def get_host_for_server(cls, server_id):
- server_details = cls.os_admin.servers_client.show_server(server_id)
- return server_details['server']['OS-EXT-SRV-ATTR:host']
-
- def _get_bdm(self, source_id, source_type, delete_on_termination=False):
- bd_map_v2 = [{
- 'uuid': source_id,
- 'source_type': source_type,
- 'destination_type': 'volume',
- 'boot_index': 0,
- 'delete_on_termination': delete_on_termination}]
- return {'block_device_mapping_v2': bd_map_v2}
-
- def boot_instance_from_resource(self, source_id,
- source_type,
- keypair=None,
- security_group=None,
- delete_on_termination=False,
- name=None):
- create_kwargs = dict()
- if keypair:
- create_kwargs['key_name'] = keypair['name']
- if security_group:
- create_kwargs['security_groups'] = [
- {'name': security_group['name']}]
- create_kwargs.update(self._get_bdm(
- source_id,
- source_type,
- delete_on_termination=delete_on_termination))
- if name:
- create_kwargs['name'] = name
-
- return self.create_server(image_id='', **create_kwargs)
-
- def create_volume_from_image(self):
- img_uuid = CONF.compute.image_ref
- vol_name = data_utils.rand_name(
- self.__class__.__name__ + '-volume-origin')
- return self.create_volume(name=vol_name, imageRef=img_uuid)
-
-
-class NetworkScenarioTest(ScenarioTest):
- """Base class for network scenario tests.
-
- This class provide helpers for network scenario tests, using the neutron
- API. Helpers from ancestor which use the nova network API are overridden
- with the neutron API.
-
- This Class also enforces using Neutron instead of novanetwork.
- Subclassed tests will be skipped if Neutron is not enabled
-
- """
-
- credentials = ['primary', 'admin']
-
- @classmethod
- def skip_checks(cls):
- super(NetworkScenarioTest, cls).skip_checks()
- if not CONF.service_available.neutron:
- raise cls.skipException('Neutron not available')
-
- def _create_network(self, networks_client=None,
- tenant_id=None,
- namestart='network-smoke-',
- port_security_enabled=True, **net_dict):
- if not networks_client:
- networks_client = self.networks_client
- if not tenant_id:
- tenant_id = networks_client.tenant_id
- name = data_utils.rand_name(namestart)
- network_kwargs = dict(name=name, tenant_id=tenant_id)
- if net_dict:
- network_kwargs.update(net_dict)
- # Neutron disables port security by default so we have to check the
- # config before trying to create the network with port_security_enabled
- if CONF.network_feature_enabled.port_security:
- network_kwargs['port_security_enabled'] = port_security_enabled
- result = networks_client.create_network(**network_kwargs)
- network = result['network']
-
- self.assertEqual(network['name'], name)
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- networks_client.delete_network,
- network['id'])
- return network
-
- def create_subnet(self, network, subnets_client=None,
- namestart='subnet-smoke', **kwargs):
- """Create a subnet for the given network
-
- within the cidr block configured for tenant networks.
- """
- if not subnets_client:
- subnets_client = self.subnets_client
-
- def cidr_in_use(cidr, tenant_id):
- """Check cidr existence
-
- :returns: True if subnet with cidr already exist in tenant
- False else
- """
- cidr_in_use = self.os_admin.subnets_client.list_subnets(
- tenant_id=tenant_id, cidr=cidr)['subnets']
- return len(cidr_in_use) != 0
-
- ip_version = kwargs.pop('ip_version', 4)
-
- if ip_version == 6:
- tenant_cidr = netaddr.IPNetwork(
- CONF.network.project_network_v6_cidr)
- num_bits = CONF.network.project_network_v6_mask_bits
- else:
- tenant_cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
- num_bits = CONF.network.project_network_mask_bits
-
- result = None
- str_cidr = None
- # Repeatedly attempt subnet creation with sequential cidr
- # blocks until an unallocated block is found.
- for subnet_cidr in tenant_cidr.subnet(num_bits):
- str_cidr = str(subnet_cidr)
- if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
- continue
-
- subnet = dict(
- name=data_utils.rand_name(namestart),
- network_id=network['id'],
- tenant_id=network['tenant_id'],
- cidr=str_cidr,
- ip_version=ip_version,
- **kwargs
- )
- try:
- result = subnets_client.create_subnet(**subnet)
- break
- except lib_exc.Conflict as e:
- is_overlapping_cidr = 'overlaps with another subnet' in str(e)
- if not is_overlapping_cidr:
- raise
- self.assertIsNotNone(result, 'Unable to allocate tenant network')
-
- subnet = result['subnet']
- self.assertEqual(subnet['cidr'], str_cidr)
-
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- subnets_client.delete_subnet, subnet['id'])
-
- return subnet
-
- def _get_server_port_id_and_ip4(self, server, ip_addr=None):
- if ip_addr:
- ports = self.os_admin.ports_client.list_ports(
- device_id=server['id'],
- fixed_ips='ip_address=%s' % ip_addr)['ports']
- else:
- ports = self.os_admin.ports_client.list_ports(
- device_id=server['id'])['ports']
- # A port can have more than one IP address in some cases.
- # If the network is dual-stack (IPv4 + IPv6), this port is associated
- # with 2 subnets
- p_status = ['ACTIVE']
- # NOTE(vsaienko) With Ironic, instances live on separate hardware
- # servers. Neutron does not bind ports for Ironic instances, as a
- # result the port remains in the DOWN state.
- # TODO(vsaienko) remove once bug: #1599836 is resolved.
- if getattr(CONF.service_available, 'ironic', False):
- p_status.append('DOWN')
- port_map = [(p["id"], fxip["ip_address"])
- for p in ports
- for fxip in p["fixed_ips"]
- if (netutils.is_valid_ipv4(fxip["ip_address"]) and
- p['status'] in p_status)]
- inactive = [p for p in ports if p['status'] != 'ACTIVE']
- if inactive:
- LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
-
- self.assertNotEmpty(port_map,
- "No IPv4 addresses found in: %s" % ports)
- self.assertEqual(len(port_map), 1,
- "Found multiple IPv4 addresses: %s. "
- "Unable to determine which port to target."
- % port_map)
- return port_map[0]
-
- def _get_network_by_name(self, network_name):
- net = self.os_admin.networks_client.list_networks(
- name=network_name)['networks']
- self.assertNotEmpty(net,
- "Unable to get network by name: %s" % network_name)
- return net[0]
-
- def create_floating_ip(self, thing, external_network_id=None,
- port_id=None, client=None):
- """Create a floating IP and associates to a resource/port on Neutron"""
- if not external_network_id:
- external_network_id = CONF.network.public_network_id
- if not client:
- client = self.floating_ips_client
- if not port_id:
- port_id, ip4 = self._get_server_port_id_and_ip4(thing)
- else:
- ip4 = None
- result = client.create_floatingip(
- floating_network_id=external_network_id,
- port_id=port_id,
- tenant_id=thing['tenant_id'],
- fixed_ip_address=ip4
- )
- floating_ip = result['floatingip']
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_floatingip,
- floating_ip['id'])
- return floating_ip
-
- def check_floating_ip_status(self, floating_ip, status):
- """Verifies floatingip reaches the given status
-
- :param dict floating_ip: floating IP dict to check status
- :param status: target status
- :raises: AssertionError if status doesn't match
- """
- floatingip_id = floating_ip['id']
-
- def refresh():
- result = (self.floating_ips_client.
- show_floatingip(floatingip_id)['floatingip'])
- return status == result['status']
-
- if not test_utils.call_until_true(refresh,
- CONF.network.build_timeout,
- CONF.network.build_interval):
- floating_ip = self.floating_ips_client.show_floatingip(
- floatingip_id)['floatingip']
- self.assertEqual(status, floating_ip['status'],
- message="FloatingIP: {fp} is at status: {cst}. "
- "failed to reach status: {st}"
- .format(fp=floating_ip, cst=floating_ip['status'],
- st=status))
- LOG.info("FloatingIP: {fp} is at status: {st}"
- .format(fp=floating_ip, st=status))
-
- def _create_security_group(self, security_group_rules_client=None,
- tenant_id=None,
- namestart='secgroup-smoke',
- security_groups_client=None):
- if security_group_rules_client is None:
- security_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- if tenant_id is None:
- tenant_id = security_groups_client.tenant_id
- secgroup = self._create_empty_security_group(
- namestart=namestart, client=security_groups_client,
- tenant_id=tenant_id)
-
- # Add rules to the security group
- rules = self._create_loginable_secgroup_rule(
- security_group_rules_client=security_group_rules_client,
- secgroup=secgroup,
- security_groups_client=security_groups_client)
- for rule in rules:
- self.assertEqual(tenant_id, rule['tenant_id'])
- self.assertEqual(secgroup['id'], rule['security_group_id'])
- return secgroup
-
- def _create_empty_security_group(self, client=None, tenant_id=None,
- namestart='secgroup-smoke'):
- """Create a security group without rules.
-
- Default rules will be created:
- - IPv4 egress to any
- - IPv6 egress to any
-
- :param tenant_id: secgroup will be created in this tenant
- :returns: the created security group
- """
- if client is None:
- client = self.security_groups_client
- if not tenant_id:
- tenant_id = client.tenant_id
- sg_name = data_utils.rand_name(namestart)
- sg_desc = sg_name + " description"
- sg_dict = dict(name=sg_name,
- description=sg_desc)
- sg_dict['tenant_id'] = tenant_id
- result = client.create_security_group(**sg_dict)
-
- secgroup = result['security_group']
- self.assertEqual(secgroup['name'], sg_name)
- self.assertEqual(tenant_id, secgroup['tenant_id'])
- self.assertEqual(secgroup['description'], sg_desc)
-
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_security_group, secgroup['id'])
- return secgroup
-
- def _create_security_group_rule(self, secgroup=None,
- sec_group_rules_client=None,
- tenant_id=None,
- security_groups_client=None, **kwargs):
- """Create a rule from a dictionary of rule parameters.
-
- Create a rule in a secgroup. if secgroup not defined will search for
- default secgroup in tenant_id.
-
- :param secgroup: the security group.
- :param tenant_id: if secgroup not passed -- the tenant in which to
- search for default secgroup
- :param kwargs: a dictionary containing rule parameters:
- for example, to allow incoming ssh:
- rule = {
- direction: 'ingress'
- protocol:'tcp',
- port_range_min: 22,
- port_range_max: 22
- }
- """
- if sec_group_rules_client is None:
- sec_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- if not tenant_id:
- tenant_id = security_groups_client.tenant_id
- if secgroup is None:
- # Get default secgroup for tenant_id
- default_secgroups = security_groups_client.list_security_groups(
- name='default', tenant_id=tenant_id)['security_groups']
- msg = "No default security group for tenant %s." % (tenant_id)
- self.assertNotEmpty(default_secgroups, msg)
- secgroup = default_secgroups[0]
-
- ruleset = dict(security_group_id=secgroup['id'],
- tenant_id=secgroup['tenant_id'])
- ruleset.update(kwargs)
-
- sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
- sg_rule = sg_rule['security_group_rule']
-
- self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
- self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
-
- return sg_rule
-
- def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
- secgroup=None,
- security_groups_client=None):
- """Create loginable security group rule
-
- This function will create:
- 1. egress and ingress tcp port 22 allow rule in order to allow ssh
- access for ipv4.
- 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
- 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
- """
-
- if security_group_rules_client is None:
- security_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- rules = []
- rulesets = [
- dict(
- # ssh
- protocol='tcp',
- port_range_min=22,
- port_range_max=22,
- ),
- dict(
- # ping
- protocol='icmp',
- ),
- dict(
- # ipv6-icmp for ping6
- protocol='icmp',
- ethertype='IPv6',
- )
- ]
- sec_group_rules_client = security_group_rules_client
- for ruleset in rulesets:
- for r_direction in ['ingress', 'egress']:
- ruleset['direction'] = r_direction
- try:
- sg_rule = self._create_security_group_rule(
- sec_group_rules_client=sec_group_rules_client,
- secgroup=secgroup,
- security_groups_client=security_groups_client,
- **ruleset)
- except lib_exc.Conflict as ex:
- # if rule already exist - skip rule and continue
- msg = 'Security group rule already exists'
- if msg not in ex._error_string:
- raise ex
- else:
- self.assertEqual(r_direction, sg_rule['direction'])
- rules.append(sg_rule)
-
- return rules
-
-
-class EncryptionScenarioTest(ScenarioTest):
- """Base class for encryption scenario tests"""
-
- credentials = ['primary', 'admin']
-
- @classmethod
- def setup_clients(cls):
- super(EncryptionScenarioTest, cls).setup_clients()
- cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest
- cls.admin_encryption_types_client =\
- cls.os_admin.encryption_types_client_latest
-
- def create_encryption_type(self, client=None, type_id=None, provider=None,
- key_size=None, cipher=None,
- control_location=None):
- if not client:
- client = self.admin_encryption_types_client
- if not type_id:
- volume_type = self.create_volume_type()
- type_id = volume_type['id']
- LOG.debug("Creating an encryption type for volume type: %s", type_id)
- client.create_encryption_type(
- type_id, provider=provider, key_size=key_size, cipher=cipher,
- control_location=control_location)
-
- def create_encrypted_volume(self, encryption_provider, volume_type,
- key_size=256, cipher='aes-xts-plain64',
- control_location='front-end'):
- volume_type = self.create_volume_type(name=volume_type)
- self.create_encryption_type(type_id=volume_type['id'],
- provider=encryption_provider,
- key_size=key_size,
- cipher=cipher,
- control_location=control_location)
- return self.create_volume(volume_type=volume_type['name'])
+ try:
+ volume = self.volumes_client.show_volume(volume['id'])['volume']
+ # Check the status. You can only detach an in-use volume, otherwise
+ # the compute API will return a 400 response.
+ if volume['status'] == 'in-use':
+ self.servers_client.detach_volume(server['id'], volume['id'])
+ except lib_exc.NotFound:
+ # Ignore 404s on detach in case the server is deleted or the volume
+ # is already detached.
+ pass
diff --git a/cinder_tempest_plugin/scenario/test_snapshots.py b/cinder_tempest_plugin/scenario/test_snapshots.py
index 3153281..5a9611f 100644
--- a/cinder_tempest_plugin/scenario/test_snapshots.py
+++ b/cinder_tempest_plugin/scenario/test_snapshots.py
@@ -24,63 +24,7 @@
def setUp(self):
super(SnapshotDataIntegrityTests, self).setUp()
self.keypair = self.create_keypair()
- self.security_group = self._create_security_group()
-
- def _get_file_md5(self, ip_address, filename, mount_path='/mnt',
- private_key=None, server=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key,
- server=server)
-
- md5_sum = ssh_client.exec_command(
- 'sudo md5sum %s/%s|cut -c 1-32' % (mount_path, filename))
- return md5_sum
-
- def _count_files(self, ip_address, mount_path='/mnt', private_key=None,
- server=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key,
- server=server)
- count = ssh_client.exec_command('sudo ls -l %s | wc -l' % mount_path)
- return int(count) - 1
-
- def _launch_instance_from_snapshot(self, snap):
- volume_snap = self.create_volume(snapshot_id=snap['id'],
- size=snap['size'])
-
- server_snap = self.boot_instance_from_resource(
- source_id=volume_snap['id'],
- source_type='volume',
- keypair=self.keypair,
- security_group=self.security_group)
-
- return server_snap
-
- def create_md5_new_file(self, ip_address, filename, mount_path='/mnt',
- private_key=None, server=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key,
- server=server)
-
- ssh_client.exec_command(
- 'sudo dd bs=1024 count=100 if=/dev/urandom of=/%s/%s' %
- (mount_path, filename))
- md5 = ssh_client.exec_command(
- 'sudo md5sum -b %s/%s|cut -c 1-32' % (mount_path, filename))
- ssh_client.exec_command('sudo sync')
- return md5
-
- def get_md5_from_file(self, instance, filename):
-
- instance_ip = self.get_server_ip(instance)
-
- md5_sum = self._get_file_md5(instance_ip, filename=filename,
- private_key=self.keypair['private_key'],
- server=instance)
- count = self._count_files(instance_ip,
- private_key=self.keypair['private_key'],
- server=instance)
- return count, md5_sum
+ self.security_group = self.create_security_group()
@decorators.idempotent_id('ff10644e-5a70-4a9f-9801-8204bb81fb61')
@utils.services('compute', 'volume', 'image', 'network')
@@ -89,32 +33,39 @@
snapshots. The procedure is as follows:
- 1) create a volume from image
- 2) Boot an instance from the volume
- 3) create file on vm and write data into it
+ 1) Create an instance with ephemeral disk
+ 2) Create a volume, attach it to the instance and create a filesystem
+ on it and mount it
+ 3) Mount the volume, create a file and write data into it, Unmount it
4) create snapshot
5) repeat 3 and 4 two more times (simply creating 3 snapshots)
- Now restore the snapshots one by one into volume, create instances
- from it and check the number of files and file content at each
+ Now create volume from the snapshots one by one, attach it to the
+ instance and check the number of files and file content at each
point when snapshot was created.
"""
- # Create a volume from image
- volume = self.create_volume_from_image()
+ # Create an instance
+ server = self.create_server(
+ key_name=self.keypair['name'],
+ security_groups=[{'name': self.security_group['name']}])
- # create an instance from bootable volume
- server = self.boot_instance_from_resource(
- source_id=volume['id'],
- source_type='volume',
- keypair=self.keypair,
- security_group=self.security_group)
+ # Create an empty volume
+ volume = self.create_volume()
instance_ip = self.get_server_ip(server)
+ # Attach volume to instance and find it's device name (eg: /dev/vdb)
+ volume_device_name, __ = self._attach_and_get_volume_device_name(
+ server, volume, instance_ip, self.keypair['private_key'])
+
+ # Create filesystem on the volume
+ self._make_fs(instance_ip, self.keypair['private_key'], server,
+ volume_device_name)
+
# Write data to volume
file1_md5 = self.create_md5_new_file(
- instance_ip, filename="file1",
+ instance_ip, dev_name=volume_device_name, filename="file1",
private_key=self.keypair['private_key'],
server=instance_ip)
@@ -123,7 +74,7 @@
# Write data to volume
file2_md5 = self.create_md5_new_file(
- instance_ip, filename="file2",
+ instance_ip, dev_name=volume_device_name, filename="file2",
private_key=self.keypair['private_key'],
server=instance_ip)
@@ -132,33 +83,51 @@
# Write data to volume
file3_md5 = self.create_md5_new_file(
- instance_ip, filename="file3",
+ instance_ip, dev_name=volume_device_name, filename="file3",
private_key=self.keypair['private_key'],
server=instance_ip)
# Create third snapshot
snapshot3 = self.create_volume_snapshot(volume['id'], force=True)
- # Create volume, instance and check file and contents for snap1
- instance_1 = self._launch_instance_from_snapshot(snapshot1)
- count_snap_1, md5_file_1 = self.get_md5_from_file(instance_1,
- 'file1')
+ # Detach the volume
+ self.nova_volume_detach(server, volume)
+
+ # Create volume from snapshot, attach it to instance and check file
+ # and contents for snap1
+ volume_snap_1 = self.create_volume(snapshot_id=snapshot1['id'])
+ volume_device_name, __ = self._attach_and_get_volume_device_name(
+ server, volume_snap_1, instance_ip, self.keypair['private_key'])
+ count_snap_1, md5_file_1 = self.get_md5_from_file(
+ server, instance_ip, 'file1', dev_name=volume_device_name)
+ # Detach the volume
+ self.nova_volume_detach(server, volume_snap_1)
self.assertEqual(count_snap_1, 1)
self.assertEqual(file1_md5, md5_file_1)
- # Create volume, instance and check file and contents for snap2
- instance_2 = self._launch_instance_from_snapshot(snapshot2)
- count_snap_2, md5_file_2 = self.get_md5_from_file(instance_2,
- 'file2')
+ # Create volume from snapshot, attach it to instance and check file
+ # and contents for snap2
+ volume_snap_2 = self.create_volume(snapshot_id=snapshot2['id'])
+ volume_device_name, __ = self._attach_and_get_volume_device_name(
+ server, volume_snap_2, instance_ip, self.keypair['private_key'])
+ count_snap_2, md5_file_2 = self.get_md5_from_file(
+ server, instance_ip, 'file2', dev_name=volume_device_name)
+ # Detach the volume
+ self.nova_volume_detach(server, volume_snap_2)
self.assertEqual(count_snap_2, 2)
self.assertEqual(file2_md5, md5_file_2)
- # Create volume, instance and check file and contents for snap3
- instance_3 = self._launch_instance_from_snapshot(snapshot3)
- count_snap_3, md5_file_3 = self.get_md5_from_file(instance_3,
- 'file3')
+ # Create volume from snapshot, attach it to instance and check file
+ # and contents for snap3
+ volume_snap_3 = self.create_volume(snapshot_id=snapshot3['id'])
+ volume_device_name, __ = self._attach_and_get_volume_device_name(
+ server, volume_snap_3, instance_ip, self.keypair['private_key'])
+ count_snap_3, md5_file_3 = self.get_md5_from_file(
+ server, instance_ip, 'file3', dev_name=volume_device_name)
+ # Detach the volume
+ self.nova_volume_detach(server, volume_snap_3)
self.assertEqual(count_snap_3, 3)
self.assertEqual(file3_md5, md5_file_3)
diff --git a/cinder_tempest_plugin/scenario/test_volume_encrypted.py b/cinder_tempest_plugin/scenario/test_volume_encrypted.py
index baf55e7..69edfa6 100644
--- a/cinder_tempest_plugin/scenario/test_volume_encrypted.py
+++ b/cinder_tempest_plugin/scenario/test_volume_encrypted.py
@@ -16,7 +16,7 @@
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from cinder_tempest_plugin.scenario import manager
+from tempest.scenario import manager
CONF = config.CONF
@@ -130,7 +130,7 @@
"""
keypair = self.create_keypair()
- security_group = self._create_security_group()
+ security_group = self.create_security_group()
volume = self.create_encrypted_volume_from_image('luks')
diff --git a/requirements.txt b/requirements.txt
index 6706885..40ef3a4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,4 +6,4 @@
oslo.config>=5.1.0 # Apache-2.0
six>=1.10.0 # MIT
oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
-tempest>=17.1.0 # Apache-2.0
+tempest>=27.0.0 # Apache-2.0
diff --git a/setup.cfg b/setup.cfg
index 6ad7813..fb188e1 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,6 +17,7 @@
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
+ Programming Language :: Python :: 3.9
[files]
packages =
diff --git a/tox.ini b/tox.ini
index e1eb31f..c9c91ad 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-minversion = 3.1.0
+minversion = 3.18.0
envlist = pep8
skipsdist = True
# this allows tox to infer the base python from the environment name