Network tests should take advantage of net_utils to find unused ip.
Currently, test_create_router_external_fixed_ips fails [0] for the
same reason described in this patch [1].
Basically, calling random.choice to select an ip address from
an allocation pool can potentially select an already-used ip
address, resulting in an error.
The best way to resolve such issues is to use the built-in
Tempest helper function `get_unused_ip_addresses` defined
here [2].
This patch makes the necessary changes to the code in [1] and
to test_routers_rbac as well.
[0] http://logs.openstack.org/03/448603/7/check/gate-tempest-dsvm-patrole-admin-fast-ubuntu-xenial-nv/2bb077f/console.html
[1] https://review.openstack.org/#/c/448250/
[2] https://github.com/openstack/tempest/blob/master/tempest/common/utils/net_utils.py
Change-Id: I9bbb7cd77cde9287e398f506de58ff23e9e95496
Closes-Bug: #1675342
diff --git a/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py b/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
index 80f6270..b59e1e5 100644
--- a/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
@@ -15,9 +15,10 @@
#
import netaddr
-import random
from oslo_log import log
+
+from tempest.common.utils import net_utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
@@ -92,14 +93,13 @@
@decorators.idempotent_id('2551e10d-006a-413c-925a-8c6f834c09ac')
def test_create_port_fixed_ips(self):
- # Pick an unused ip address to avoid IpAddressAlreadyAllocated
- # exception.
- current_ports = self.ports_client.list_ports()['ports']
- in_use_ips = [p['fixed_ips'][0]['ip_address'] for p in current_ports]
- unused_ip_range = list(set(self.ip_range) - set(in_use_ips))
- ip_address = random.choice(unused_ip_range)
-
- fixed_ips = [{'ip_address': ip_address},
+ # Pick an unused ip address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ fixed_ips = [{'ip_address': ip_list[0]},
{'subnet_id': self.subnet['id']}]
post_body = {'network_id': self.network['id'],
@@ -259,16 +259,17 @@
post_body = {'network_id': self.network['id']}
port = self._create_port(**post_body)
- # Pick an unused ip address to avoid IpAddressAlreadyAllocated
- # exception.
- current_ports = self.ports_client.list_ports()['ports']
- in_use_ips = [p['fixed_ips'][0]['ip_address'] for p in current_ports]
- unused_ip_range = list(set(self.ip_range) - set(in_use_ips))
- ip_address = random.choice(unused_ip_range)
+ # Pick an unused ip address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ fixed_ips = [{'ip_address': ip_list[0]}]
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.ports_client.update_port(port['id'],
- fixed_ips=[{'ip_address': ip_address}])
+ fixed_ips=fixed_ips)
@rbac_rule_validation.action(service="neutron",
rule="update_port:port_security_enabled")
@@ -318,9 +319,14 @@
@decorators.idempotent_id('729c2151-bb49-4f4f-9d58-3ed8819b7582')
def test_update_port_allowed_address_pairs(self):
- ip_address = random.choice(list(self.ip_range))
+ # Pick an unused ip address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
# Update allowed address pair attribute of port
- address_pairs = [{'ip_address': ip_address,
+ address_pairs = [{'ip_address': ip_list[0],
'mac_address': data_utils.rand_mac_address()}]
post_body = {'network_id': self.network['id']}
port = self._create_port(**post_body)
diff --git a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
index 69aa32a..eee9ff2 100644
--- a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
@@ -14,9 +14,10 @@
# under the License.
import netaddr
-import random
from oslo_log import log
+
+from tempest.common.utils import net_utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
@@ -43,12 +44,12 @@
super(RouterRbacTest, cls).resource_setup()
post_body = {}
post_body['router:external'] = True
- cls.admin_network = cls.create_network(**post_body)
- cls.admin_subnet = cls.create_subnet(cls.admin_network)
- cls.admin_ip_range = netaddr.IPRange(
- cls.admin_subnet['allocation_pools'][0]['start'],
- cls.admin_subnet['allocation_pools'][0]['end'])
- cls.admin_router = cls.create_router()
+ cls.network = cls.create_network(**post_body)
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.ip_range = netaddr.IPRange(
+ cls.subnet['allocation_pools'][0]['start'],
+ cls.subnet['allocation_pools'][0]['end'])
+ cls.router = cls.create_router()
@rbac_rule_validation.action(service="neutron",
rule="create_router")
@@ -74,7 +75,7 @@
create_router:external_gateway_info:enable_snat policy
"""
name = data_utils.rand_name('snat-router')
- external_gateway_info = {'network_id': self.admin_network['id'],
+ external_gateway_info = {'network_id': self.network['id'],
'enable_snat': True}
self.rbac_utils.switch_role(self, switchToRbacRole=True)
@@ -95,12 +96,15 @@
"""
name = data_utils.rand_name('snat-router')
- # Pick an ip address within the allocation_pools range
- ip_address = random.choice(list(self.admin_ip_range))
- external_fixed_ips = {'subnet_id': self.admin_subnet['id'],
- 'ip_address': ip_address}
-
- external_gateway_info = {'network_id': self.admin_network['id'],
+ # Pick an unused IP address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ external_fixed_ips = {'subnet_id': self.subnet['id'],
+ 'ip_address': ip_list[0]}
+ external_gateway_info = {'network_id': self.network['id'],
'enable_snat': False,
'external_fixed_ips': [external_fixed_ips]}
@@ -120,7 +124,7 @@
RBAC test for the neutron get_router policy
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
- self.routers_client.show_router(self.admin_router['id'])
+ self.routers_client.show_router(self.router['id'])
@rbac_rule_validation.action(
service="neutron", rule="update_router")
@@ -132,7 +136,7 @@
"""
new_name = data_utils.rand_name('new-router-name')
self.rbac_utils.switch_role(self, switchToRbacRole=True)
- self.routers_client.update_router(self.admin_router['id'],
+ self.routers_client.update_router(self.router['id'],
name=new_name)
@rbac_rule_validation.action(
@@ -145,7 +149,7 @@
update_router:external_gateway_info policy
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
- self.routers_client.update_router(self.admin_router['id'],
+ self.routers_client.update_router(self.router['id'],
external_gateway_info={})
@rbac_rule_validation.action(
@@ -160,11 +164,11 @@
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.routers_client.update_router(
- self.admin_router['id'],
- external_gateway_info={'network_id': self.admin_network['id']})
+ self.router['id'],
+ external_gateway_info={'network_id': self.network['id']})
self.addCleanup(
self.routers_client.update_router,
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=None)
@rbac_rule_validation.action(
@@ -179,12 +183,12 @@
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.routers_client.update_router(
- self.admin_router['id'],
- external_gateway_info={'network_id': self.admin_network['id'],
+ self.router['id'],
+ external_gateway_info={'network_id': self.network['id'],
'enable_snat': True})
self.addCleanup(
self.routers_client.update_router,
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=None)
@rbac_rule_validation.action(
@@ -197,20 +201,24 @@
RBAC test for the neutron
update_router:external_gateway_info:external_fixed_ips policy
"""
- # Pick an ip address within the allocation_pools range
- ip_address = random.choice(list(self.admin_ip_range))
- external_fixed_ips = {'subnet_id': self.admin_subnet['id'],
- 'ip_address': ip_address}
- external_gateway_info = {'network_id': self.admin_network['id'],
+ # Pick an unused IP address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ external_fixed_ips = {'subnet_id': self.subnet['id'],
+ 'ip_address': ip_list[0]}
+ external_gateway_info = {'network_id': self.network['id'],
'external_fixed_ips': [external_fixed_ips]}
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.routers_client.update_router(
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=external_gateway_info)
self.addCleanup(
self.routers_client.update_router,
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=None)
@rbac_rule_validation.action(service="neutron",