Merge "Add iDRAC BIOS cleaning steps tests"
diff --git a/ironic_tempest_plugin/config.py b/ironic_tempest_plugin/config.py
index de517dc..acf2542 100644
--- a/ironic_tempest_plugin/config.py
+++ b/ironic_tempest_plugin/config.py
@@ -173,6 +173,9 @@
default='bios',
choices=['bios', 'uefi'],
help="The desired boot_mode to be used on testing nodes."),
+ cfg.StrOpt('default_boot_option',
+ # No good default here, we need to actually set it.
+ help="The default boot option the testing nodes are using."),
]
BaremetalFeaturesGroup = [
diff --git a/ironic_tempest_plugin/manager.py b/ironic_tempest_plugin/manager.py
index 95fe207..67126a3 100644
--- a/ironic_tempest_plugin/manager.py
+++ b/ironic_tempest_plugin/manager.py
@@ -19,448 +19,27 @@
# file was copied from openstack/tempest/tempest/scenario/manager.py,
# openstack/tempest commit: 82a278e88c9e9f9ba49f81c1f8dba0bca7943daf
-import subprocess
-
from oslo_log import log
-from oslo_utils import netutils
-from tempest.common import compute
-from tempest.common.utils.linux import remote_client
-from tempest.common.utils import net_utils
-from tempest.common import waiters
from tempest import config
-from tempest import exceptions
-from tempest.lib.common.utils import data_utils
-from tempest.lib.common.utils import test_utils
-from tempest.lib import exceptions as lib_exc
-import tempest.test
+from tempest.scenario import manager
CONF = config.CONF
LOG = log.getLogger(__name__)
-class ScenarioTest(tempest.test.BaseTestCase):
+class ScenarioTest(manager.ScenarioTest):
"""Base class for scenario tests. Uses tempest own clients. """
credentials = ['primary', 'admin', 'system_admin']
- @classmethod
- def setup_clients(cls):
- super(ScenarioTest, cls).setup_clients()
- # Clients (in alphabetical order)
- cls.flavors_client = cls.os_primary.flavors_client
- cls.compute_floating_ips_client = (
- cls.os_primary.compute_floating_ips_client)
- if CONF.service_available.glance:
- # Check if glance v1 is available to determine which client to use.
- if CONF.image_feature_enabled.api_v1:
- cls.image_client = cls.os_primary.image_client
- elif CONF.image_feature_enabled.api_v2:
- cls.image_client = cls.os_primary.image_client_v2
- else:
- raise lib_exc.InvalidConfiguration(
- 'Either api_v1 or api_v2 must be True in '
- '[image-feature-enabled].')
- # Compute image client
- cls.compute_images_client = cls.os_primary.compute_images_client
- cls.keypairs_client = cls.os_primary.keypairs_client
- # Nova security groups client
- cls.compute_security_groups_client = (
- cls.os_primary.compute_security_groups_client)
- cls.compute_security_group_rules_client = (
- cls.os_primary.compute_security_group_rules_client)
- cls.servers_client = cls.os_primary.servers_client
- cls.interface_client = cls.os_primary.interfaces_client
- # Neutron network client
- cls.networks_client = cls.os_primary.networks_client
- cls.ports_client = cls.os_primary.ports_client
- cls.routers_client = cls.os_primary.routers_client
- cls.subnets_client = cls.os_primary.subnets_client
- cls.floating_ips_client = cls.os_primary.floating_ips_client
- cls.security_groups_client = cls.os_primary.security_groups_client
- cls.security_group_rules_client = (
- cls.os_primary.security_group_rules_client)
-
- cls.volumes_client = cls.os_primary.volumes_client_latest
- cls.snapshots_client = cls.os_primary.snapshots_client_latest
-
# ## Test functions library
#
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
- def _create_port(self, network_id, client=None, namestart='port-quotatest',
- **kwargs):
- if not client:
- client = self.os_primary.ports_client
- name = data_utils.rand_name(namestart)
- result = client.create_port(
- name=name,
- network_id=network_id,
- **kwargs)
- self.assertIsNotNone(result, 'Unable to allocate port')
- port = result['port']
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_port, port['id'])
- return port
- def create_keypair(self, client=None):
- if not client:
- client = self.os_primary.keypairs_client
- name = data_utils.rand_name(self.__class__.__name__)
- # We don't need to create a keypair by pubkey in scenario
- body = client.create_keypair(name=name)
- self.addCleanup(client.delete_keypair, name)
- return body['keypair']
-
- def create_server(self, name=None, image_id=None, flavor=None,
- validatable=False, wait_until='ACTIVE',
- clients=None, **kwargs):
- """Wrapper utility that returns a test server.
-
- This wrapper utility calls the common create test server and
- returns a test server. The purpose of this wrapper is to minimize
- the impact on the code of the tests already using this
- function.
- """
-
- # NOTE(jlanoux): As a first step, ssh checks in the scenario
- # tests need to be run regardless of the run_validation and
- # validatable parameters and thus until the ssh validation job
- # becomes voting in CI. The test resources management and IP
- # association are taken care of in the scenario tests.
- # Therefore, the validatable parameter is set to false in all
- # those tests. In this way create_server just return a standard
- # server and the scenario tests always perform ssh checks.
-
- # Needed for the cross_tenant_traffic test:
- if clients is None:
- clients = self.os_primary
-
- if name is None:
- name = data_utils.rand_name(self.__class__.__name__ + "-server")
-
- vnic_type = CONF.network.port_vnic_type
-
- # If vnic_type is configured create port for
- # every network
- if vnic_type:
- ports = []
-
- create_port_body = {'binding:vnic_type': vnic_type,
- 'namestart': 'port-smoke'}
- if kwargs:
- # Convert security group names to security group ids
- # to pass to create_port
- if 'security_groups' in kwargs:
- security_groups = \
- clients.security_groups_client.list_security_groups(
- ).get('security_groups')
- sec_dict = dict([(s['name'], s['id'])
- for s in security_groups])
-
- sec_groups_names = [s['name'] for s in kwargs.pop(
- 'security_groups')]
- security_groups_ids = [sec_dict[s]
- for s in sec_groups_names]
-
- if security_groups_ids:
- create_port_body[
- 'security_groups'] = security_groups_ids
- networks = kwargs.pop('networks', [])
- else:
- networks = []
-
- # If there are no networks passed to us we look up
- # for the project's private networks and create a port.
- # The same behaviour as we would expect when passing
- # the call to the clients with no networks
- if not networks:
- networks = clients.networks_client.list_networks(
- **{'router:external': False, 'fields': 'id'})['networks']
-
- # It's net['uuid'] if networks come from kwargs
- # and net['id'] if they come from
- # clients.networks_client.list_networks
- for net in networks:
- net_id = net.get('uuid', net.get('id'))
- if 'port' not in net:
- port = self._create_port(network_id=net_id,
- client=clients.ports_client,
- **create_port_body)
- ports.append({'port': port['id']})
- else:
- ports.append({'port': net['port']})
- if ports:
- kwargs['networks'] = ports
- self.ports = ports
-
- tenant_network = self.get_tenant_network()
-
- body, servers = compute.create_test_server(
- clients,
- tenant_network=tenant_network,
- wait_until=wait_until,
- name=name, flavor=flavor,
- image_id=image_id, **kwargs)
-
- self.addCleanup(waiters.wait_for_server_termination,
- clients.servers_client, body['id'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- clients.servers_client.delete_server, body['id'])
- server = clients.servers_client.show_server(body['id'])['server']
- return server
-
- def get_remote_client(self, ip_address, username=None, private_key=None):
- """Get a SSH client to a remote server
-
- @param ip_address the server floating or fixed IP address to use
- for ssh validation
- @param username name of the Linux account on the remote server
- @param private_key the SSH private key to use
- @return a RemoteClient object
- """
-
- if username is None:
- username = CONF.validation.image_ssh_user
- # Set this with 'keypair' or others to log in with keypair or
- # username/password.
- if CONF.validation.auth_method == 'keypair':
- password = None
- if private_key is None:
- private_key = self.keypair['private_key']
- else:
- password = CONF.validation.image_ssh_password
- private_key = None
- linux_client = remote_client.RemoteClient(ip_address, username,
- pkey=private_key,
- password=password)
- try:
- linux_client.validate_authentication()
- except Exception as e:
- message = ('Initializing SSH connection to %(ip)s failed. '
- 'Error: %(error)s' % {'ip': ip_address,
- 'error': e})
- caller = test_utils.find_test_caller()
- if caller:
- message = '(%s) %s' % (caller, message)
- LOG.exception(message)
- self._log_console_output()
- raise
-
- return linux_client
-
- def _log_console_output(self, servers=None):
- if not CONF.compute_feature_enabled.console_output:
- LOG.debug('Console output not supported, cannot log')
- return
- client = self.os_primary.servers_client
- if not servers:
- servers = client.list_servers()
- servers = servers['servers']
- for server in servers:
- try:
- console_output = client.get_console_output(
- server['id'])['output']
- LOG.debug('Console output for %s\nbody=\n%s',
- server['id'], console_output)
- except lib_exc.NotFound:
- LOG.debug("Server %s disappeared(deleted) while looking "
- "for the console log", server['id'])
-
- def rebuild_server(self, server_id, image=None,
- preserve_ephemeral=False, wait=True,
- rebuild_kwargs=None):
- if image is None:
- image = CONF.compute.image_ref
-
- rebuild_kwargs = rebuild_kwargs or {}
-
- LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
- server_id, image, preserve_ephemeral)
- self.os_primary.servers_client.rebuild_server(
- server_id=server_id, image_ref=image,
- preserve_ephemeral=preserve_ephemeral,
- **rebuild_kwargs)
- if wait:
- waiters.wait_for_server_status(self.os_primary.servers_client,
- server_id, 'ACTIVE')
-
- def ping_ip_address(self, ip_address, should_succeed=True,
- ping_timeout=None, mtu=None):
- timeout = ping_timeout or CONF.validation.ping_timeout
- cmd = ['ping', '-c1', '-w1']
-
- if mtu:
- cmd += [
- # don't fragment
- '-M', 'do',
- # ping receives just the size of ICMP payload
- '-s', str(net_utils.get_ping_payload_size(mtu, 4))
- ]
- cmd.append(ip_address)
-
- def ping():
- proc = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc.communicate()
-
- return (proc.returncode == 0) == should_succeed
-
- caller = test_utils.find_test_caller()
- LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
- ' expected result is %(should_succeed)s', {
- 'caller': caller, 'ip': ip_address, 'timeout': timeout,
- 'should_succeed':
- 'reachable' if should_succeed else 'unreachable'
- })
- result = test_utils.call_until_true(ping, timeout, 1)
- LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
- 'ping result is %(result)s', {
- 'caller': caller, 'ip': ip_address, 'timeout': timeout,
- 'result': 'expected' if result else 'unexpected'
- })
- return result
-
- def check_vm_connectivity(self, ip_address,
- username=None,
- private_key=None,
- should_connect=True,
- mtu=None):
- """Check server connectivity
-
- :param ip_address: server to test against
- :param username: server's ssh username
- :param private_key: server's ssh private key to be used
- :param should_connect: True/False indicates positive/negative test
- positive - attempt ping and ssh
- negative - attempt ping and fail if succeed
- :param mtu: network MTU to use for connectivity validation
-
- :raises: AssertError if the result of the connectivity check does
- not match the value of the should_connect param
- """
- if should_connect:
- msg = "Timed out waiting for %s to become reachable" % ip_address
- else:
- msg = "ip address %s is reachable" % ip_address
- self.assertTrue(self.ping_ip_address(ip_address,
- should_succeed=should_connect,
- mtu=mtu),
- msg=msg)
- if should_connect:
- # no need to check ssh for negative connectivity
- self.get_remote_client(ip_address, username, private_key)
-
- def create_floating_ip(self, thing, pool_name=None):
- """Create a floating IP and associates to a server on Nova"""
-
- if not pool_name:
- pool_name = CONF.network.floating_network_name
- client = self.os_primary.compute_floating_ips_client
- floating_ip = (client.
- create_floating_ip(pool=pool_name)['floating_ip'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_floating_ip,
- floating_ip['id'])
- client.associate_floating_ip_to_server(
- floating_ip['ip'], thing['id'])
- return floating_ip
-
- def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
- private_key=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key)
- if dev_name is not None:
- ssh_client.make_fs(dev_name)
- ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
- mount_path))
- cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
- ssh_client.exec_command(cmd_timestamp)
- timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
- % mount_path)
- if dev_name is not None:
- ssh_client.exec_command('sudo umount %s' % mount_path)
- return timestamp
-
- def get_server_ip(self, server):
- """Get the server fixed or floating IP.
-
- Based on the configuration we're in, return a correct ip
- address for validating that a guest is up.
- """
- if CONF.validation.connect_method == 'floating':
- # The tests calling this method don't have a floating IP
- # and can't make use of the validation resources. So the
- # method is creating the floating IP there.
- return self.create_floating_ip(server)['ip']
- elif CONF.validation.connect_method == 'fixed':
- # Determine the network name to look for based on config or creds
- # provider network resources.
- if CONF.validation.network_for_ssh:
- addresses = server['addresses'][
- CONF.validation.network_for_ssh]
- else:
- creds_provider = self._get_credentials_provider()
- net_creds = creds_provider.get_primary_creds()
- network = getattr(net_creds, 'network', None)
- addresses = (server['addresses'][network['name']]
- if network else [])
- for address in addresses:
- if (address['version'] == CONF.validation.ip_version_for_ssh
- and address['OS-EXT-IPS:type'] == 'fixed'):
- return address['addr']
- raise exceptions.ServerUnreachable(server_id=server['id'])
- else:
- raise lib_exc.InvalidConfiguration()
-
- def _get_router(self, client=None, tenant_id=None):
- """Retrieve a router for the given tenant id.
-
- If a public router has been configured, it will be returned.
-
- If a public router has not been configured, but a public
- network has, a tenant router will be created and returned that
- routes traffic to the public network.
- """
- if not client:
- client = self.os_primary.routers_client
- if not tenant_id:
- tenant_id = client.tenant_id
- router_id = CONF.network.public_router_id
- network_id = CONF.network.public_network_id
- if router_id:
- body = client.show_router(router_id)
- return body['router']
- elif network_id:
- router = self._create_router(client, tenant_id)
- kwargs = {'external_gateway_info': dict(network_id=network_id)}
- router = client.update_router(router['id'], **kwargs)['router']
- return router
- else:
- raise Exception("Neither of 'public_router_id' or "
- "'public_network_id' has been defined.")
-
- def _create_router(self, client=None, tenant_id=None,
- namestart='router-smoke'):
- if not client:
- client = self.os_primary.routers_client
- if not tenant_id:
- tenant_id = client.tenant_id
- name = data_utils.rand_name(namestart)
- result = client.create_router(name=name,
- admin_state_up=True,
- tenant_id=tenant_id)
- router = result['router']
- self.assertEqual(router['name'], name)
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_router,
- router['id'])
- return router
-
-
-class NetworkScenarioTest(ScenarioTest):
+class NetworkScenarioTest(manager.NetworkScenarioTest):
"""Base class for network scenario tests.
This class provide helpers for network scenario tests, using the neutron
@@ -479,84 +58,3 @@
super(NetworkScenarioTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
-
- def _create_network(self, networks_client=None,
- tenant_id=None,
- namestart='network-smoke-',
- port_security_enabled=True):
- if not networks_client:
- networks_client = self.os_primary.networks_client
- if not tenant_id:
- tenant_id = self.os_primary.networks_client.tenant_id
- name = data_utils.rand_name(namestart)
- network_kwargs = dict(name=name, tenant_id=tenant_id)
- # Neutron disables port security by default so we have to check the
- # config before trying to create the network with port_security_enabled
- if CONF.network_feature_enabled.port_security:
- network_kwargs['port_security_enabled'] = port_security_enabled
- result = networks_client.create_network(**network_kwargs)
- network = result['network']
-
- self.assertEqual(network['name'], name)
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- networks_client.delete_network,
- network['id'])
- return network
-
- def _get_server_port_id_and_ip4(self, server, ip_addr=None):
- if ip_addr:
- ports = self.os_admin.ports_client.list_ports(
- device_id=server['id'],
- fixed_ips='ip_address=%s' % ip_addr)['ports']
- else:
- ports = self.os_admin.ports_client.list_ports(
- device_id=server['id'])['ports']
- # A port can have more than one IP address in some cases.
- # If the network is dual-stack (IPv4 + IPv6), this port is associated
- # with 2 subnets
- p_status = ['ACTIVE']
- # NOTE(vsaienko) With Ironic, instances live on separate hardware
- # servers. Neutron does not bind ports for Ironic instances, as a
- # result the port remains in the DOWN state.
- # TODO(vsaienko) remove once bug: #1599836 is resolved.
- if getattr(CONF.service_available, 'ironic', False):
- p_status.append('DOWN')
- port_map = [(p["id"], fxip["ip_address"])
- for p in ports
- for fxip in p["fixed_ips"]
- if netutils.is_valid_ipv4(fxip["ip_address"])
- and p['status'] in p_status]
- inactive = [p for p in ports if p['status'] != 'ACTIVE']
- if inactive:
- LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
-
- self.assertNotEqual(0, len(port_map),
- "No IPv4 addresses found in: %s" % ports)
- self.assertEqual(len(port_map), 1,
- "Found multiple IPv4 addresses: %s. "
- "Unable to determine which port to target."
- % port_map)
- return port_map[0]
-
- def create_floating_ip(self, thing, external_network_id=None,
- port_id=None, client=None):
- """Create a floating IP and associates to a resource/port on Neutron"""
- if not external_network_id:
- external_network_id = CONF.network.public_network_id
- if not client:
- client = self.os_primary.floating_ips_client
- if not port_id:
- port_id, ip4 = self._get_server_port_id_and_ip4(thing)
- else:
- ip4 = None
- result = client.create_floatingip(
- floating_network_id=external_network_id,
- port_id=port_id,
- tenant_id=thing['tenant_id'],
- fixed_ip_address=ip4
- )
- floating_ip = result['floatingip']
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_floatingip,
- floating_ip['id'])
- return floating_ip
diff --git a/ironic_tempest_plugin/tests/scenario/baremetal_manager.py b/ironic_tempest_plugin/tests/scenario/baremetal_manager.py
index 4820c64..b897e34 100644
--- a/ironic_tempest_plugin/tests/scenario/baremetal_manager.py
+++ b/ironic_tempest_plugin/tests/scenario/baremetal_manager.py
@@ -276,4 +276,4 @@
waiters.wait_for_server_status(servers_client,
instance['id'], 'ACTIVE')
# Verify server connection
- self.get_remote_client(server_ip)
+ self.get_remote_client(server_ip, server=instance)
diff --git a/ironic_tempest_plugin/tests/scenario/test_baremetal_basic_ops.py b/ironic_tempest_plugin/tests/scenario/test_baremetal_basic_ops.py
index d4e8ae3..95430ce 100644
--- a/ironic_tempest_plugin/tests/scenario/test_baremetal_basic_ops.py
+++ b/ironic_tempest_plugin/tests/scenario/test_baremetal_basic_ops.py
@@ -187,6 +187,19 @@
# Flavor traits should be a subset of node traits.
self.assertTrue(traits.issubset(set(node['traits'])))
+ def validate_image(self):
+ iinfo = self.node['instance_info']
+ if self.wholedisk_image:
+ self.assertNotIn('kernel', iinfo)
+ self.assertNotIn('ramdisk', iinfo)
+ else:
+ if 'image_type' in iinfo:
+ self.assertEqual('partition', iinfo['image_type'])
+ else:
+ self.assertTrue(iinfo['kernel'])
+ self.assertTrue(iinfo['ramdisk'])
+ self.assertGreater(int(iinfo['root_gb']), 0)
+
def validate_uefi(self, client):
efi_dir = '/sys/firmware/efi'
success_string = "Found " + efi_dir
@@ -197,20 +210,22 @@
def baremetal_server_ops(self):
self.add_keypair()
- self.instance, self.node = self.boot_instance()
+ self.instance, self.node = self.boot_instance(image_id=self.image_ref)
+ self.validate_image()
self.validate_ports()
self.validate_scheduling()
ip_address = self.get_server_ip(self.instance)
- vm_client = self.get_remote_client(ip_address)
+ vm_client = self.get_remote_client(ip_address, server=self.instance)
# We expect the ephemeral partition to be mounted on /mnt and to have
# the same size as our flavor definition.
eph_size = self.get_flavor_ephemeral_size()
- if eph_size:
+ if eph_size and not self.wholedisk_image:
self.verify_partition(vm_client, 'ephemeral0', '/mnt', eph_size)
# Create the test file
- self.create_timestamp(
- ip_address, private_key=self.keypair['private_key'])
+ self.create_timestamp(ip_address,
+ private_key=self.keypair['private_key'],
+ server=self.instance)
if CONF.baremetal.boot_mode == "uefi":
self.validate_uefi(vm_client)
@@ -225,6 +240,13 @@
@decorators.idempotent_id('549173a5-38ec-42bb-b0e2-c8b9f4a08943')
@utils.services('compute', 'image', 'network')
def test_baremetal_server_ops_partition_image(self):
+ # NOTE(dtantsur): cirros partition images don't have grub, we cannot
+ # use local boot on BIOS with them.
+ if (CONF.baremetal.partition_netboot
+ and CONF.baremetal.default_boot_option == 'local'):
+ raise self.skipException(
+ "Cannot test partition images with local boot on cirros")
+
self.image_ref = CONF.baremetal.partition_image_ref
self.wholedisk_image = False
self.baremetal_server_ops()
diff --git a/ironic_tempest_plugin/tests/scenario/test_baremetal_boot_from_volume.py b/ironic_tempest_plugin/tests/scenario/test_baremetal_boot_from_volume.py
index 154fc96..b724683 100644
--- a/ironic_tempest_plugin/tests/scenario/test_baremetal_boot_from_volume.py
+++ b/ironic_tempest_plugin/tests/scenario/test_baremetal_boot_from_volume.py
@@ -147,6 +147,7 @@
# Get server ip and validate authentication
ip_address = self.get_server_ip(self.instance)
- self.get_remote_client(ip_address).validate_authentication()
+ self.get_remote_client(ip_address,
+ server=self.instance).validate_authentication()
self.terminate_instance(instance=self.instance)
diff --git a/ironic_tempest_plugin/tests/scenario/test_baremetal_multitenancy.py b/ironic_tempest_plugin/tests/scenario/test_baremetal_multitenancy.py
index fc4c513..086bf21 100644
--- a/ironic_tempest_plugin/tests/scenario/test_baremetal_multitenancy.py
+++ b/ironic_tempest_plugin/tests/scenario/test_baremetal_multitenancy.py
@@ -51,12 +51,12 @@
raise cls.skipException(msg)
def create_tenant_network(self, clients, tenant_cidr):
- network = self._create_network(
+ network = self.create_network(
networks_client=clients.networks_client,
- tenant_id=clients.credentials.tenant_id)
- router = self._get_router(
+ project_id=clients.credentials.project_id)
+ router = self.get_router(
client=clients.routers_client,
- tenant_id=clients.credentials.tenant_id)
+ project_id=clients.credentials.tenant_id)
result = clients.subnets_client.create_subnet(
name=data_utils.rand_name('subnet'),
@@ -104,7 +104,8 @@
instance1,
)['floating_ip_address']
self.check_vm_connectivity(ip_address=floating_ip1,
- private_key=keypair['private_key'])
+ private_key=keypair['private_key'],
+ server=instance1)
if use_vm:
# Create VM on compute node
alt_instance = self.create_server(
@@ -127,7 +128,8 @@
)['floating_ip_address']
self.check_vm_connectivity(
ip_address=alt_floating_ip,
- private_key=alt_keypair['private_key'])
+ private_key=alt_keypair['private_key'],
+ server=alt_instance)
self.verify_l3_connectivity(
alt_floating_ip,
alt_keypair['private_key'],
diff --git a/ironic_tempest_plugin/tests/scenario/test_baremetal_single_tenant.py b/ironic_tempest_plugin/tests/scenario/test_baremetal_single_tenant.py
index e06d43a..2138d3e 100644
--- a/ironic_tempest_plugin/tests/scenario/test_baremetal_single_tenant.py
+++ b/ironic_tempest_plugin/tests/scenario/test_baremetal_single_tenant.py
@@ -51,12 +51,12 @@
raise cls.skipException(msg)
def create_tenant_network(self, clients, tenant_cidr):
- network = self._create_network(
+ network = self.create_network(
networks_client=clients.networks_client,
- tenant_id=clients.credentials.tenant_id)
- router = self._get_router(
+ project_id=clients.credentials.project_id)
+ router = self.get_router(
client=clients.routers_client,
- tenant_id=clients.credentials.tenant_id)
+ project_id=clients.credentials.tenant_id)
extra_subnet_args = {}
if CONF.validation.ip_version_for_ssh == 6:
@@ -118,7 +118,8 @@
instance1,
)['floating_ip_address']
self.check_vm_connectivity(ip_address=floating_ip1,
- private_key=keypair['private_key'])
+ private_key=keypair['private_key'],
+ server=instance1)
if use_vm:
# Create VM on compute node
@@ -146,7 +147,8 @@
)['floating_ip_address']
self.check_vm_connectivity(
ip_address=floating_ip2,
- private_key=keypair['private_key'])
+ private_key=keypair['private_key'],
+ server=instance2)
self.verify_l3_connectivity(
floating_ip2,
diff --git a/ironic_tempest_plugin/tests/scenario/test_introspection_basic.py b/ironic_tempest_plugin/tests/scenario/test_introspection_basic.py
index 9823a88..f30b5e0 100644
--- a/ironic_tempest_plugin/tests/scenario/test_introspection_basic.py
+++ b/ironic_tempest_plugin/tests/scenario/test_introspection_basic.py
@@ -24,24 +24,15 @@
data = self.introspection_data(node['uuid'])
self.assertEqual(data['cpu_arch'],
self.flavor['properties']['cpu_arch'])
- self.assertEqual(int(data['memory_mb']),
- int(self.flavor['ram']))
- self.assertEqual(int(data['cpus']), int(self.flavor['vcpus']))
+ self.assertGreater(int(data['memory_mb']), 0)
+ self.assertGreater(int(data['cpus']), 0)
def verify_node_flavor(self, node):
- expected_cpus = self.flavor['vcpus']
- expected_memory_mb = self.flavor['ram']
expected_cpu_arch = self.flavor['properties']['cpu_arch']
- disk_size = self.flavor['disk']
- ephemeral_size = self.flavor['OS-FLV-EXT-DATA:ephemeral']
- expected_local_gb = disk_size + ephemeral_size
- self.assertEqual(expected_cpus,
- int(node['properties']['cpus']))
- self.assertEqual(expected_memory_mb,
- int(node['properties']['memory_mb']))
- self.assertEqual(expected_local_gb,
- int(node['properties']['local_gb']))
+ self.assertGreater(int(node['properties']['cpus']), 0)
+ self.assertGreater(int(node['properties']['memory_mb']), 0)
+ self.assertGreater(int(node['properties']['local_gb']), 0)
self.assertEqual(expected_cpu_arch,
node['properties']['cpu_arch'])