Merge "Migrate test_network_basic_ops to tempest clients"
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index c7236ce..f7db79d 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -39,6 +39,7 @@
from tempest import exceptions
from tempest.openstack.common import log
from tempest.openstack.common import timeutils
+from tempest.services.network import resources as net_resources
import tempest.test
CONF = config.CONF
@@ -88,6 +89,9 @@
cls.servers_client = cls.manager.servers_client
cls.volumes_client = cls.manager.volumes_client
cls.snapshots_client = cls.manager.snapshots_client
+ cls.interface_client = cls.manager.interfaces_client
+ # Neutron network client
+ cls.network_client = cls.manager.network_client
@classmethod
def _get_credentials(cls, get_creds, ctype):
@@ -121,16 +125,17 @@
# not at the end of the class
self.addCleanup(self._wait_for_cleanups)
- def delete_wrapper(self, delete_thing, thing_id):
+ def delete_wrapper(self, delete_thing, *args, **kwargs):
"""Ignores NotFound exceptions for delete operations.
- @param delete_thing: delete method of a resource
- @param thing_id: id of the resource to be deleted
+ @param delete_thing: delete method of a resource. method will be
+ executed as delete_thing(*args, **kwargs)
+
"""
try:
# Tempest clients return dicts, so there is no common delete
# method available. Using a callable instead
- delete_thing(thing_id)
+ delete_thing(*args, **kwargs)
except exceptions.NotFound:
# If the resource is already missing, mission accomplished.
pass
@@ -268,7 +273,7 @@
_, volume = self.volumes_client.get_volume(volume['id'])
return volume
- def _create_loginable_secgroup_rule_nova(self, secgroup_id=None):
+ def _create_loginable_secgroup_rule(self, secgroup_id=None):
_client = self.security_groups_client
if secgroup_id is None:
_, sgs = _client.list_security_groups()
@@ -306,7 +311,7 @@
rules.append(sg_rule)
return rules
- def _create_security_group_nova(self):
+ def _create_security_group(self):
# Create security group
sg_name = data_utils.rand_name(self.__class__.__name__)
sg_desc = sg_name + " description"
@@ -319,7 +324,7 @@
secgroup['id'])
# Add rules to the security group
- self._create_loginable_secgroup_rule_nova(secgroup['id'])
+ self._create_loginable_secgroup_rule(secgroup['id'])
return secgroup
@@ -425,6 +430,474 @@
return snapshot_image
+# TODO(yfried): change this class name to NetworkScenarioTest once client
+# migration is complete
+class NeutronScenarioTest(ScenarioTest):
+ """Base class for network scenario tests.
+ This class provide helpers for network scenario tests, using the neutron
+ API. Helpers from ancestor which use the nova network API are overridden
+ with the neutron API.
+
+ This Class also enforces using Neutron instead of novanetwork.
+ Subclassed tests will be skipped if Neutron is not enabled
+
+ """
+
+ @classmethod
+ def check_preconditions(cls):
+ if CONF.service_available.neutron:
+ cls.enabled = True
+ # verify that neutron_available is telling the truth
+ try:
+ cls.network_client.list_networks()
+ except exc.EndpointNotFound:
+ cls.enabled = False
+ raise
+ else:
+ cls.enabled = False
+ msg = 'Neutron not available'
+ raise cls.skipException(msg)
+
+ @classmethod
+ def setUpClass(cls):
+ super(NeutronScenarioTest, cls).setUpClass()
+ cls.tenant_id = cls.manager.identity_client.tenant_id
+ cls.check_preconditions()
+
+ def _create_network(self, tenant_id, namestart='network-smoke-'):
+ name = data_utils.rand_name(namestart)
+ _, result = self.network_client.create_network(name=name,
+ tenant_id=tenant_id)
+ network = net_resources.DeletableNetwork(client=self.network_client,
+ **result['network'])
+ self.assertEqual(network.name, name)
+ self.addCleanup(self.delete_wrapper, network.delete)
+ return network
+
+ def _list_networks(self, *args, **kwargs):
+ """List networks using admin creds """
+ return self._admin_lister('networks')(*args, **kwargs)
+
+ def _list_subnets(self, *args, **kwargs):
+ """List subnets using admin creds """
+ return self._admin_lister('subnets')(*args, **kwargs)
+
+ def _list_routers(self, *args, **kwargs):
+ """List routers using admin creds """
+ return self._admin_lister('routers')(*args, **kwargs)
+
+ def _list_ports(self, *args, **kwargs):
+ """List ports using admin creds """
+ return self._admin_lister('ports')(*args, **kwargs)
+
+ def _admin_lister(self, resource_type):
+ def temp(*args, **kwargs):
+ temp_method = self.admin_manager.network_client.__getattr__(
+ 'list_%s' % resource_type)
+ _, resource_list = temp_method(*args, **kwargs)
+ return resource_list[resource_type]
+ return temp
+
+ def _create_subnet(self, network, namestart='subnet-smoke-', **kwargs):
+ """
+ Create a subnet for the given network within the cidr block
+ configured for tenant networks.
+ """
+
+ def cidr_in_use(cidr, tenant_id):
+ """
+ :return True if subnet with cidr already exist in tenant
+ False else
+ """
+ cidr_in_use = self._list_subnets(tenant_id=tenant_id, cidr=cidr)
+ return len(cidr_in_use) != 0
+
+ tenant_cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
+ result = None
+ # Repeatedly attempt subnet creation with sequential cidr
+ # blocks until an unallocated block is found.
+ for subnet_cidr in tenant_cidr.subnet(
+ CONF.network.tenant_network_mask_bits):
+ str_cidr = str(subnet_cidr)
+ if cidr_in_use(str_cidr, tenant_id=network.tenant_id):
+ continue
+
+ subnet = dict(
+ name=data_utils.rand_name(namestart),
+ ip_version=4,
+ network_id=network.id,
+ tenant_id=network.tenant_id,
+ cidr=str_cidr,
+ **kwargs
+ )
+ try:
+ _, result = self.network_client.create_subnet(**subnet)
+ break
+ except exc.NeutronClientException as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ self.assertIsNotNone(result, 'Unable to allocate tenant network')
+ subnet = net_resources.DeletableSubnet(client=self.network_client,
+ **result['subnet'])
+ self.assertEqual(subnet.cidr, str_cidr)
+ self.addCleanup(self.delete_wrapper, subnet.delete)
+ return subnet
+
+ def _create_port(self, network, namestart='port-quotatest'):
+ name = data_utils.rand_name(namestart)
+ _, result = self.network_client.create_port(
+ name=name,
+ network_id=network.id,
+ tenant_id=network.tenant_id)
+ self.assertIsNotNone(result, 'Unable to allocate port')
+ port = net_resources.DeletablePort(client=self.network_client,
+ **result['port'])
+ self.addCleanup(self.delete_wrapper, port.delete)
+ return port
+
+ def _get_server_port_id(self, server, ip_addr=None):
+ ports = self._list_ports(device_id=server['id'],
+ fixed_ip=ip_addr)
+ self.assertEqual(len(ports), 1,
+ "Unable to determine which port to target.")
+ return ports[0]['id']
+
+ def _create_floating_ip(self, thing, external_network_id, port_id=None):
+ if not port_id:
+ port_id = self._get_server_port_id(thing)
+ _, result = self.network_client.create_floatingip(
+ floating_network_id=external_network_id,
+ port_id=port_id,
+ tenant_id=thing['tenant_id']
+ )
+ floating_ip = net_resources.DeletableFloatingIp(
+ client=self.network_client,
+ **result['floatingip'])
+ self.addCleanup(self.delete_wrapper, floating_ip.delete)
+ return floating_ip
+
+ def _associate_floating_ip(self, floating_ip, server):
+ port_id = self._get_server_port_id(server)
+ floating_ip.update(port_id=port_id)
+ self.assertEqual(port_id, floating_ip.port_id)
+ return floating_ip
+
+ def _disassociate_floating_ip(self, floating_ip):
+ """
+ :param floating_ip: type DeletableFloatingIp
+ """
+ floating_ip.update(port_id=None)
+ self.assertIsNone(floating_ip.port_id)
+ return floating_ip
+
+ def _ping_ip_address(self, ip_address, should_succeed=True):
+ cmd = ['ping', '-c1', '-w1', ip_address]
+
+ def ping():
+ proc = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc.wait()
+ return (proc.returncode == 0) == should_succeed
+
+ return tempest.test.call_until_true(
+ ping, CONF.compute.ping_timeout, 1)
+
+ def _check_vm_connectivity(self, ip_address,
+ username=None,
+ private_key=None,
+ should_connect=True):
+ """
+ :param ip_address: server to test against
+ :param username: server's ssh username
+ :param private_key: server's ssh private key to be used
+ :param should_connect: True/False indicates positive/negative test
+ positive - attempt ping and ssh
+ negative - attempt ping and fail if succeed
+
+ :raises: AssertError if the result of the connectivity check does
+ not match the value of the should_connect param
+ """
+ if should_connect:
+ msg = "Timed out waiting for %s to become reachable" % ip_address
+ else:
+ msg = "ip address %s is reachable" % ip_address
+ self.assertTrue(self._ping_ip_address(ip_address,
+ should_succeed=should_connect),
+ msg=msg)
+ if should_connect:
+ # no need to check ssh for negative connectivity
+ self.get_remote_client(ip_address, username, private_key)
+
+ def _check_public_network_connectivity(self, ip_address, username,
+ private_key, should_connect=True,
+ msg=None, servers=None):
+ # The target login is assumed to have been configured for
+ # key-based authentication by cloud-init.
+ LOG.debug('checking network connections to IP %s with user: %s' %
+ (ip_address, username))
+ try:
+ self._check_vm_connectivity(ip_address,
+ username,
+ private_key,
+ should_connect=should_connect)
+ except Exception as e:
+ ex_msg = 'Public network connectivity check failed'
+ if msg:
+ ex_msg += ": " + msg
+ LOG.exception(ex_msg)
+ self._log_console_output(servers)
+ # network debug is called as part of ssh init
+ if not isinstance(e, exceptions.SSHTimeout):
+ debug.log_net_debug()
+ raise
+
+ def _check_tenant_network_connectivity(self, server,
+ username,
+ private_key,
+ should_connect=True,
+ servers_for_debug=None):
+ if not CONF.network.tenant_networks_reachable:
+ msg = 'Tenant networks not configured to be reachable.'
+ LOG.info(msg)
+ return
+ # The target login is assumed to have been configured for
+ # key-based authentication by cloud-init.
+ try:
+ for net_name, ip_addresses in server['networks'].iteritems():
+ for ip_address in ip_addresses:
+ self._check_vm_connectivity(ip_address,
+ username,
+ private_key,
+ should_connect=should_connect)
+ except Exception as e:
+ LOG.exception('Tenant network connectivity check failed')
+ self._log_console_output(servers_for_debug)
+ # network debug is called as part of ssh init
+ if not isinstance(e, exceptions.SSHTimeout):
+ debug.log_net_debug()
+ raise
+
+ def _check_remote_connectivity(self, source, dest, should_succeed=True):
+ """
+ check ping server via source ssh connection
+
+ :param source: RemoteClient: an ssh connection from which to ping
+ :param dest: and IP to ping against
+ :param should_succeed: boolean should ping succeed or not
+ :returns: boolean -- should_succeed == ping
+ :returns: ping is false if ping failed
+ """
+ def ping_remote():
+ try:
+ source.ping_host(dest)
+ except exceptions.SSHExecCommandFailed:
+ LOG.warn('Failed to ping IP: %s via a ssh connection from: %s.'
+ % (dest, source.ssh_client.host))
+ return not should_succeed
+ return should_succeed
+
+ return tempest.test.call_until_true(ping_remote,
+ CONF.compute.ping_timeout,
+ 1)
+
+ def _create_security_group(self, tenant_id, client=None,
+ namestart='secgroup-smoke'):
+ if client is None:
+ client = self.network_client
+ secgroup = self._create_empty_security_group(namestart=namestart,
+ client=client,
+ tenant_id=tenant_id)
+
+ # Add rules to the security group
+ rules = self._create_loginable_secgroup_rule(secgroup=secgroup)
+ for rule in rules:
+ self.assertEqual(tenant_id, rule.tenant_id)
+ self.assertEqual(secgroup.id, rule.security_group_id)
+ return secgroup
+
+ def _create_empty_security_group(self, tenant_id, client=None,
+ namestart='secgroup-smoke'):
+ """Create a security group without rules.
+
+ Default rules will be created:
+ - IPv4 egress to any
+ - IPv6 egress to any
+
+ :param tenant_id: secgroup will be created in this tenant
+ :returns: DeletableSecurityGroup -- containing the secgroup created
+ """
+ if client is None:
+ client = self.network_client
+ sg_name = data_utils.rand_name(namestart)
+ sg_desc = sg_name + " description"
+ sg_dict = dict(name=sg_name,
+ description=sg_desc)
+ sg_dict['tenant_id'] = tenant_id
+ _, result = client.create_security_group(**sg_dict)
+ secgroup = net_resources.DeletableSecurityGroup(
+ client=client,
+ **result['security_group']
+ )
+ self.assertEqual(secgroup.name, sg_name)
+ self.assertEqual(tenant_id, secgroup.tenant_id)
+ self.assertEqual(secgroup.description, sg_desc)
+ self.addCleanup(self.delete_wrapper, secgroup.delete)
+ return secgroup
+
+ def _default_security_group(self, tenant_id, client=None):
+ """Get default secgroup for given tenant_id.
+
+ :returns: DeletableSecurityGroup -- default secgroup for given tenant
+ """
+ if client is None:
+ client = self.network_client
+ sgs = [
+ sg for sg in client.list_security_groups().values()[0]
+ if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
+ ]
+ msg = "No default security group for tenant %s." % (tenant_id)
+ self.assertTrue(len(sgs) > 0, msg)
+ if len(sgs) > 1:
+ msg = "Found %d default security groups" % len(sgs)
+ raise exc.NeutronClientNoUniqueMatch(msg=msg)
+ return net_resources.DeletableSecurityGroup(client=client,
+ **sgs[0])
+
+ def _create_security_group_rule(self, client=None, secgroup=None,
+ tenant_id=None, **kwargs):
+ """Create a rule from a dictionary of rule parameters.
+
+ Create a rule in a secgroup. if secgroup not defined will search for
+ default secgroup in tenant_id.
+
+ :param secgroup: type DeletableSecurityGroup.
+ :param secgroup_id: search for secgroup by id
+ default -- choose default secgroup for given tenant_id
+ :param tenant_id: if secgroup not passed -- the tenant in which to
+ search for default secgroup
+ :param kwargs: a dictionary containing rule parameters:
+ for example, to allow incoming ssh:
+ rule = {
+ direction: 'ingress'
+ protocol:'tcp',
+ port_range_min: 22,
+ port_range_max: 22
+ }
+ """
+ if client is None:
+ client = self.network_client
+ if secgroup is None:
+ secgroup = self._default_security_group(tenant_id)
+
+ ruleset = dict(security_group_id=secgroup.id,
+ tenant_id=secgroup.tenant_id)
+ ruleset.update(kwargs)
+
+ _, sg_rule = client.create_security_group_rule(**ruleset)
+ sg_rule = net_resources.DeletableSecurityGroupRule(
+ client=client,
+ **sg_rule['security_group_rule']
+ )
+ self.addCleanup(self.delete_wrapper, sg_rule.delete)
+ self.assertEqual(secgroup.tenant_id, sg_rule.tenant_id)
+ self.assertEqual(secgroup.id, sg_rule.security_group_id)
+
+ return sg_rule
+
+ def _create_loginable_secgroup_rule(self, client=None, secgroup=None):
+ """These rules are intended to permit inbound ssh and icmp
+ traffic from all sources, so no group_id is provided.
+ Setting a group_id would only permit traffic from ports
+ belonging to the same security group.
+ """
+
+ if client is None:
+ client = self.network_client
+ rules = []
+ rulesets = [
+ dict(
+ # ssh
+ protocol='tcp',
+ port_range_min=22,
+ port_range_max=22,
+ ),
+ dict(
+ # ping
+ protocol='icmp',
+ )
+ ]
+ for ruleset in rulesets:
+ for r_direction in ['ingress', 'egress']:
+ ruleset['direction'] = r_direction
+ try:
+ sg_rule = self._create_security_group_rule(
+ client=client, secgroup=secgroup, **ruleset)
+ except exceptions.Conflict as ex:
+ # if rule already exist - skip rule and continue
+ msg = 'Security group rule already exists'
+ if msg not in ex._error_string:
+ raise ex
+ else:
+ self.assertEqual(r_direction, sg_rule.direction)
+ rules.append(sg_rule)
+
+ return rules
+
+ def _ssh_to_server(self, server, private_key):
+ ssh_login = CONF.compute.image_ssh_user
+ return self.get_remote_client(server,
+ username=ssh_login,
+ private_key=private_key)
+
+ def _get_router(self, tenant_id):
+ """Retrieve a router for the given tenant id.
+
+ If a public router has been configured, it will be returned.
+
+ If a public router has not been configured, but a public
+ network has, a tenant router will be created and returned that
+ routes traffic to the public network.
+ """
+ router_id = CONF.network.public_router_id
+ network_id = CONF.network.public_network_id
+ if router_id:
+ result = self.network_client.show_router(router_id)
+ return net_resources.AttributeDict(**result['router'])
+ elif network_id:
+ router = self._create_router(tenant_id)
+ router.set_gateway(network_id)
+ return router
+ else:
+ raise Exception("Neither of 'public_router_id' or "
+ "'public_network_id' has been defined.")
+
+ def _create_router(self, tenant_id, namestart='router-smoke-'):
+ name = data_utils.rand_name(namestart)
+ _, result = self.network_client.create_router(name=name,
+ admin_state_up=True,
+ tenant_id=tenant_id, )
+ router = net_resources.DeletableRouter(client=self.network_client,
+ **result['router'])
+ self.assertEqual(router.name, name)
+ self.addCleanup(self.delete_wrapper, router.delete)
+ return router
+
+ def _create_networks(self, tenant_id=None):
+ """Create a network with a subnet connected to a router.
+
+ :returns: network, subnet, router
+ """
+ if tenant_id is None:
+ tenant_id = self.tenant_id
+ network = self._create_network(tenant_id)
+ router = self._get_router(tenant_id)
+ subnet = self._create_subnet(network)
+ subnet.add_to_router(router.id)
+ return network, subnet, router
+
+
class OfficialClientTest(tempest.test.BaseTestCase):
"""
Official Client test base class for scenario testing.
diff --git a/tempest/scenario/test_minimum_basic.py b/tempest/scenario/test_minimum_basic.py
index 4bc4a98..8a8e387 100644
--- a/tempest/scenario/test_minimum_basic.py
+++ b/tempest/scenario/test_minimum_basic.py
@@ -124,7 +124,7 @@
self.assertEqual('available', volume['status'])
def create_and_add_security_group(self):
- secgroup = self._create_security_group_nova()
+ secgroup = self._create_security_group()
self.servers_client.add_security_group(self.server['id'],
secgroup['name'])
self.addCleanup(self.servers_client.remove_security_group,
diff --git a/tempest/scenario/test_network_basic_ops.py b/tempest/scenario/test_network_basic_ops.py
index bba034b..81cfd91 100644
--- a/tempest/scenario/test_network_basic_ops.py
+++ b/tempest/scenario/test_network_basic_ops.py
@@ -18,13 +18,13 @@
import testtools
-from tempest.api.network import common as net_common
from tempest.common import debug
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
from tempest.scenario import manager
+from tempest.services.network import resources as net_resources
from tempest import test
CONF = config.CONF
@@ -34,7 +34,7 @@
['floating_ip', 'server'])
-class TestNetworkBasicOps(manager.NetworkScenarioTest):
+class TestNetworkBasicOps(manager.NeutronScenarioTest):
"""
This smoke test suite assumes that Nova has been configured to
@@ -96,21 +96,20 @@
if not test.is_extension_enabled(ext, 'network'):
msg = "%s extension not enabled." % ext
raise cls.skipException(msg)
- cls.check_preconditions()
def setUp(self):
super(TestNetworkBasicOps, self).setUp()
self.security_group = \
- self._create_security_group_neutron(tenant_id=self.tenant_id)
+ self._create_security_group(tenant_id=self.tenant_id)
self.network, self.subnet, self.router = self._create_networks()
self.check_networks()
- self.servers = {}
+ self.keypairs = {}
+ self.servers = []
name = data_utils.rand_name('server-smoke')
- serv_dict = self._create_server(name, self.network)
- self.servers[serv_dict['server']] = serv_dict['keypair']
+ server = self._create_server(name, self.network)
self._check_tenant_network_connectivity()
- self._create_and_associate_floating_ips()
+ self._create_and_associate_floating_ips(server)
def check_networks(self):
"""
@@ -139,32 +138,36 @@
seen_router_ids)
def _create_server(self, name, network):
- keypair = self.create_keypair(name='keypair-%s' % name)
- security_groups = [self.security_group.name]
+ keypair = self.create_keypair()
+ self.keypairs[keypair['name']] = keypair
+ security_groups = [self.security_group]
create_kwargs = {
'nics': [
{'net-id': network.id},
],
- 'key_name': keypair.name,
+ 'key_name': keypair['name'],
'security_groups': security_groups,
}
server = self.create_server(name=name, create_kwargs=create_kwargs)
- return dict(server=server, keypair=keypair)
+ self.servers.append(server)
+ return server
+
+ def _get_server_key(self, server):
+ return self.keypairs[server['key_name']]['private_key']
def _check_tenant_network_connectivity(self):
ssh_login = CONF.compute.image_ssh_user
- for server, key in self.servers.iteritems():
+ for server in self.servers:
# call the common method in the parent class
super(TestNetworkBasicOps, self).\
_check_tenant_network_connectivity(
- server, ssh_login, key.private_key,
- servers_for_debug=self.servers.keys())
+ server, ssh_login, self._get_server_key(server),
+ servers_for_debug=self.servers)
- def _create_and_associate_floating_ips(self):
+ def _create_and_associate_floating_ips(self, server):
public_network_id = CONF.network.public_network_id
- for server in self.servers.keys():
- floating_ip = self._create_floating_ip(server, public_network_id)
- self.floating_ip_tuple = Floating_IP_tuple(floating_ip, server)
+ floating_ip = self._create_floating_ip(server, public_network_id)
+ self.floating_ip_tuple = Floating_IP_tuple(floating_ip, server)
def _check_public_network_connectivity(self, should_connect=True,
msg=None):
@@ -173,11 +176,11 @@
ip_address = floating_ip.floating_ip_address
private_key = None
if should_connect:
- private_key = self.servers[server].private_key
+ private_key = self._get_server_key(server)
# call the common method in the parent class
super(TestNetworkBasicOps, self)._check_public_network_connectivity(
ip_address, ssh_login, private_key, should_connect, msg,
- self.servers.keys())
+ self.servers)
def _disassociate_floating_ips(self):
floating_ip, server = self.floating_ip_tuple
@@ -189,11 +192,10 @@
floating_ip, server = self.floating_ip_tuple
name = data_utils.rand_name('new_server-smoke-')
# create a new server for the floating ip
- serv_dict = self._create_server(name, self.network)
- self.servers[serv_dict['server']] = serv_dict['keypair']
- self._associate_floating_ip(floating_ip, serv_dict['server'])
+ server = self._create_server(name, self.network)
+ self._associate_floating_ip(floating_ip, server)
self.floating_ip_tuple = Floating_IP_tuple(
- floating_ip, serv_dict['server'])
+ floating_ip, server)
def _create_new_network(self):
self.new_net = self._create_network(self.tenant_id)
@@ -204,27 +206,27 @@
def _hotplug_server(self):
old_floating_ip, server = self.floating_ip_tuple
ip_address = old_floating_ip.floating_ip_address
- private_key = self.servers[server].private_key
+ private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(ip_address,
private_key=private_key)
old_nic_list = self._get_server_nics(ssh_client)
# get a port from a list of one item
- port_list = self._list_ports(device_id=server.id)
+ port_list = self._list_ports(device_id=server['id'])
self.assertEqual(1, len(port_list))
old_port = port_list[0]
- self.compute_client.servers.interface_attach(server=server,
- net_id=self.new_net.id,
- port_id=None,
- fixed_ip=None)
- # move server to the head of the cleanup list
- self.addCleanup(self.delete_timeout,
- self.compute_client.servers,
- server.id)
- self.addCleanup(self.delete_wrapper, server)
+ _, interface = self.interface_client.create_interface(
+ server=server['id'],
+ network_id=self.new_net.id)
+ self.addCleanup(self.network_client.wait_for_resource_deletion,
+ 'port',
+ interface['port_id'])
+ self.addCleanup(self.delete_wrapper,
+ self.interface_client.delete_interface,
+ server['id'], interface['port_id'])
def check_ports():
self.new_port_list = [port for port in
- self._list_ports(device_id=server.id)
+ self._list_ports(device_id=server['id'])
if port != old_port]
return len(self.new_port_list) == 1
@@ -233,8 +235,8 @@
raise exceptions.TimeoutException("No new port attached to the "
"server in time (%s sec) !"
% CONF.network.build_timeout)
- new_port = net_common.DeletablePort(client=self.network_client,
- **self.new_port_list[0])
+ new_port = net_resources.DeletablePort(client=self.network_client,
+ **self.new_port_list[0])
def check_new_nic():
new_nic_list = self._get_server_nics(ssh_client)
@@ -267,7 +269,7 @@
# get internal ports' ips:
# get all network ports in the new network
internal_ips = (p['fixed_ips'][0]['ip_address'] for p in
- self._list_ports(tenant_id=server.tenant_id,
+ self._list_ports(tenant_id=server['tenant_id'],
network_id=network.id)
if p['device_owner'].startswith('network'))
@@ -283,8 +285,8 @@
LOG.info(msg)
return
- subnet = self.network_client.list_subnets(
- network_id=CONF.network.public_network_id)['subnets']
+ subnet = self._list_subnets(
+ network_id=CONF.network.public_network_id)
self.assertEqual(1, len(subnet), "Found %d subnets" % len(subnet))
external_ips = [subnet[0]['gateway_ip']]
@@ -293,7 +295,7 @@
def _check_server_connectivity(self, floating_ip, address_list):
ip_address = floating_ip.floating_ip_address
- private_key = self.servers[self.floating_ip_tuple.server].private_key
+ private_key = self._get_server_key(self.floating_ip_tuple.server)
ssh_source = self._ssh_to_server(ip_address, private_key)
for remote_ip in address_list:
diff --git a/tempest/scenario/test_server_basic_ops.py b/tempest/scenario/test_server_basic_ops.py
index 77e195d..b38b1a3 100644
--- a/tempest/scenario/test_server_basic_ops.py
+++ b/tempest/scenario/test_server_basic_ops.py
@@ -102,7 +102,7 @@
@test.services('compute', 'network')
def test_server_basicops(self):
self.add_keypair()
- self.security_group = self._create_security_group_nova()
+ self.security_group = self._create_security_group()
self.boot_instance()
self.verify_ssh()
self.servers_client.delete_server(self.instance['id'])
diff --git a/tempest/scenario/test_snapshot_pattern.py b/tempest/scenario/test_snapshot_pattern.py
index d500065..dc32edc 100644
--- a/tempest/scenario/test_snapshot_pattern.py
+++ b/tempest/scenario/test_snapshot_pattern.py
@@ -82,7 +82,7 @@
def test_snapshot_pattern(self):
# prepare for booting a instance
self._add_keypair()
- self.security_group = self._create_security_group_nova()
+ self.security_group = self._create_security_group()
# boot a instance and create a timestamp file in it
server = self._boot_image(CONF.compute.image_ref)
diff --git a/tempest/scenario/test_volume_boot_pattern.py b/tempest/scenario/test_volume_boot_pattern.py
index ec8575a..c32923a 100644
--- a/tempest/scenario/test_volume_boot_pattern.py
+++ b/tempest/scenario/test_volume_boot_pattern.py
@@ -56,7 +56,7 @@
'device_name': 'vda',
'volume_id': vol_id,
'delete_on_termination': '0'}]
- self.security_group = self._create_security_group_nova()
+ self.security_group = self._create_security_group()
security_groups = [{'name': self.security_group['name']}]
create_kwargs = {
'block_device_mapping': bd_map,
@@ -140,7 +140,7 @@
@test.services('compute', 'volume', 'image')
def test_volume_boot_pattern(self):
keypair = self.create_keypair()
- self.security_group = self._create_security_group_nova()
+ self.security_group = self._create_security_group()
# create an instance from volume
volume_origin = self._create_volume_from_image()
@@ -187,7 +187,7 @@
bdms = [{'uuid': vol_id, 'source_type': 'volume',
'destination_type': 'volume', 'boot_index': 0,
'delete_on_termination': False}]
- self.security_group = self._create_security_group_nova()
+ self.security_group = self._create_security_group()
security_groups = [{'name': self.security_group['name']}]
create_kwargs = {
'block_device_mapping_v2': bdms,
diff --git a/tempest/services/network/resources.py b/tempest/services/network/resources.py
new file mode 100644
index 0000000..b2feb87
--- /dev/null
+++ b/tempest/services/network/resources.py
@@ -0,0 +1,163 @@
+# Copyright 2013 Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+import six
+
+
+class AttributeDict(dict):
+
+ """
+ Provide attribute access (dict.key) to dictionary values.
+ """
+
+ def __getattr__(self, name):
+ """Allow attribute access for all keys in the dict."""
+ if name in self:
+ return self[name]
+ return super(AttributeDict, self).__getattribute__(name)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class DeletableResource(AttributeDict):
+
+ """
+ Support deletion of neutron resources (networks, subnets) via a
+ delete() method, as is supported by keystone and nova resources.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.client = kwargs.pop('client', None)
+ super(DeletableResource, self).__init__(*args, **kwargs)
+
+ def __str__(self):
+ return '<%s id="%s" name="%s">' % (self.__class__.__name__,
+ self.id, self.name)
+
+ @abc.abstractmethod
+ def delete(self):
+ return
+
+ def __hash__(self):
+ return hash(self.id)
+
+
+class DeletableNetwork(DeletableResource):
+
+ def delete(self):
+ self.client.delete_network(self.id)
+
+
+class DeletableSubnet(DeletableResource):
+
+ def __init__(self, *args, **kwargs):
+ super(DeletableSubnet, self).__init__(*args, **kwargs)
+ self._router_ids = set()
+
+ def update(self, *args, **kwargs):
+ _, result = self.client.update_subnet(subnet=self.id, *args, **kwargs)
+ super(DeletableSubnet, self).update(**result['subnet'])
+
+ def add_to_router(self, router_id):
+ self._router_ids.add(router_id)
+ self.client.add_router_interface_with_subnet_id(router_id,
+ subnet_id=self.id)
+
+ def delete(self):
+ for router_id in self._router_ids.copy():
+ self.client.remove_router_interface_with_subnet_id(
+ router_id,
+ subnet_id=self.id)
+ self._router_ids.remove(router_id)
+ self.client.delete_subnet(self.id)
+
+
+class DeletableRouter(DeletableResource):
+
+ def set_gateway(self, network_id):
+ return self.update(external_gateway_info=dict(network_id=network_id))
+
+ def unset_gateway(self):
+ return self.update(external_gateway_info=dict())
+
+ def update(self, *args, **kwargs):
+ _, result = self.client.update_router(self.id,
+ *args,
+ **kwargs)
+ return super(DeletableRouter, self).update(**result['router'])
+
+ def delete(self):
+ self.unset_gateway()
+ self.client.delete_router(self.id)
+
+
+class DeletableFloatingIp(DeletableResource):
+
+ def update(self, *args, **kwargs):
+ _, result = self.client.update_floatingip(self.id,
+ *args,
+ **kwargs)
+ super(DeletableFloatingIp, self).update(**result['floatingip'])
+
+ def __repr__(self):
+ return '<%s addr="%s">' % (self.__class__.__name__,
+ self.floating_ip_address)
+
+ def __str__(self):
+ return '<"FloatingIP" addr="%s" id="%s">' % (self.floating_ip_address,
+ self.id)
+
+ def delete(self):
+ self.client.delete_floatingip(self.id)
+
+
+class DeletablePort(DeletableResource):
+
+ def delete(self):
+ self.client.delete_port(self.id)
+
+
+class DeletableSecurityGroup(DeletableResource):
+
+ def delete(self):
+ self.client.delete_security_group(self.id)
+
+
+class DeletableSecurityGroupRule(DeletableResource):
+
+ def __repr__(self):
+ return '<%s id="%s">' % (self.__class__.__name__, self.id)
+
+ def delete(self):
+ self.client.delete_security_group_rule(self.id)
+
+
+class DeletablePool(DeletableResource):
+
+ def delete(self):
+ self.client.delete_pool(self.id)
+
+
+class DeletableMember(DeletableResource):
+
+ def delete(self):
+ self.client.delete_member(self.id)
+
+
+class DeletableVip(DeletableResource):
+
+ def delete(self):
+ self.client.delete_vip(self.id)