blob: 37a16a0db545a9744c8dd765dbf54c9252fbdb7d [file] [log] [blame]
# Copyright 2024 Mirantis, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.compute import base
from tempest.common import waiters
from tempest import config
from tempest.lib import decorators
CONF = config.CONF
class UserMigrationsTestBase(base.BaseV2ComputeTest):
credentials = ['primary', 'admin']
@classmethod
def skip_checks(cls):
super().skip_checks()
if CONF.compute.min_compute_nodes < 2:
raise cls.skipException(
"Less than 2 compute nodes, skipping multinode tests."
)
@classmethod
def setup_clients(cls):
super().setup_clients()
cls.admin_servers_client = cls.os_admin.servers_client
@classmethod
def get_host_for_server(cls, server_id):
server_details = cls.admin_servers_client.show_server(server_id)
return server_details['server']['OS-EXT-SRV-ATTR:host']
class UserColdMigrationTestCase(UserMigrationsTestBase):
@classmethod
def skip_checks(cls):
super().skip_checks()
if not CONF.compute_feature_enabled.cold_migration:
raise cls.skipException("Cold migration is not available")
if not CONF.compute_feature_enabled.cold_migration_nonadmins:
raise cls.skipException(
"Cold migration is not available for non-admin users"
)
@decorators.idempotent_id('f58acd67-2d01-4613-8baf-6e22a19b4e0b')
def test_user_cold_migrate_server(self):
server = self.create_test_server(wait_until="ACTIVE")
src_host = self.get_host_for_server(server['id'])
self.servers_client.migrate_server(server['id'])
waiters.wait_for_server_status(self.servers_client,
server['id'], 'VERIFY_RESIZE')
self.servers_client.confirm_resize_server(server['id'])
waiters.wait_for_server_status(self.servers_client,
server['id'], 'ACTIVE')
dst_host = self.get_host_for_server(server['id'])
self.assertNotEqual(src_host, dst_host)
class UserLiveMigrationTestCase(UserMigrationsTestBase):
min_microversion = '2.25'
max_microversion = 'latest'
if CONF.compute_feature_enabled.volume_multiattach:
min_microversion = '2.60'
@classmethod
def skip_checks(cls):
super().skip_checks()
if not CONF.compute_feature_enabled.live_migration:
raise cls.skipException("Live migration is not available")
if not CONF.compute_feature_enabled.live_migration_nonadmins:
raise cls.skipException(
"Live migration is not available for non-admin users"
)
@decorators.idempotent_id('94100e69-2641-4152-9580-d16631bb228a')
def test_user_live_migrate_server(self):
server = self.create_test_server(wait_until="ACTIVE")
src_host = self.get_host_for_server(server['id'])
self.servers_client.live_migrate_server(
server['id'], host=None, block_migration="auto",
)
waiters.wait_for_server_status(self.servers_client,
server['id'], 'ACTIVE')
dst_host = self.get_host_for_server(server['id'])
self.assertNotEqual(src_host, dst_host)