Merge "RBAC test for "dhcp_agent_scheduler" network policy"
diff --git a/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py b/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
index 8208fde..f72fe75 100644
--- a/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
@@ -17,8 +17,8 @@
from tempest import config
from tempest.lib.common.utils import data_utils
-from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
+from tempest import test
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation
@@ -27,68 +27,37 @@
CONF = config.CONF
-class RbacNetworksTest(base.BaseNetworkRbacTest):
+class NetworksRbacTest(base.BaseNetworkRbacTest):
@classmethod
def resource_setup(cls):
- super(RbacNetworksTest, cls).resource_setup()
-
- network_name = data_utils.rand_name(
- cls.__name__ + '-rbac-admin-network-')
-
- post_body = {'name': network_name}
- body = cls.networks_client.create_network(**post_body)
- cls.admin_network = body['network']
- cls.networks.append(cls.admin_network)
-
- # Create a subnet by admin user
+ super(NetworksRbacTest, cls).resource_setup()
+ cls.network = cls.create_network()
cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
-
- cls.admin_subnet = cls.create_subnet(cls.admin_network,
- cidr=cls.cidr,
- mask_bits=24,
- enable_dhcp=False)
-
- def _delete_network(self, network):
- # Deleting network also deletes its subnets if exists
- self.networks_client.delete_network(network['id'])
- if network in self.networks:
- self.networks.remove(network)
- for subnet in self.subnets:
- if subnet['network_id'] == network['id']:
- self.subnets.remove(subnet)
+ cls.subnet = cls.create_subnet(
+ cls.network, cidr=cls.cidr, mask_bits=24, enable_dhcp=False)
def _create_network(self,
- shared=None,
router_external=None,
router_private=None,
provider_network_type=None,
provider_physical_network=None,
- provider_segmentation_id=None):
+ provider_segmentation_id=None,
+ **kwargs):
+ if router_external is not None:
+ kwargs['router:external'] = router_external
+ if router_private is not None:
+ kwargs['router:private'] = router_private
+ if provider_network_type is not None:
+ kwargs['provider:network_type'] = provider_network_type
+ if provider_physical_network is not None:
+ kwargs['provider:physical_network'] = provider_physical_network
+ if provider_segmentation_id is not None:
+ kwargs['provider:segmentation_id'] = provider_segmentation_id
network_name = data_utils.rand_name(
- self.__class__.__name__ + '-test-network-')
- post_body = {'name': network_name}
-
- if shared is not None:
- post_body['shared'] = shared
- if router_external is not None:
- post_body['router:external'] = router_external
- if router_private is not None:
- post_body['router:private'] = router_private
- if provider_network_type is not None:
- post_body['provider:network_type'] = provider_network_type
- if provider_physical_network is not None:
- post_body['provider:physical_network'] = provider_physical_network
- if provider_segmentation_id is not None:
- post_body['provider:segmentation_id'] = provider_segmentation_id
-
- body = self.networks_client.create_network(**post_body)
- network = body['network']
-
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self._delete_network, network)
- self.assertEqual('ACTIVE', network['status'])
+ self.__class__.__name__ + '-Network')
+ network = self.create_network(network_name=network_name, **kwargs)
return network
def _update_network(self,
@@ -97,30 +66,24 @@
shared_network=None,
router_external=None,
router_private=None,
- segments=None):
-
- # update a network that has been created during class setup
+ segments=None,
+ **kwargs):
if not net_id:
- net_id = self.admin_network['id']
-
- post_body = {}
- updated_network = None
+ net_id = self.network['id']
if admin is not None:
- post_body['admin_state_up'] = admin
+ kwargs['admin_state_up'] = admin
elif shared_network is not None:
- post_body['shared'] = shared_network
+ kwargs['shared'] = shared_network
elif router_external is not None:
- post_body['router:external'] = router_external
+ kwargs['router:external'] = router_external
elif router_private is not None:
- post_body['router:private'] = router_private
+ kwargs['router:private'] = router_private
elif segments is not None:
- post_body['segments'] = segments
- else:
- return updated_network
+ kwargs['segments'] = segments
- body = self.networks_client.update_network(net_id, **post_body)
- updated_network = body['network']
+ updated_network = self.networks_client.update_network(
+ net_id, **kwargs)['network']
return updated_network
@rbac_rule_validation.action(service="neutron",
@@ -194,13 +157,11 @@
RBAC test for the neutron update_network policy
"""
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- updated_network = self._update_network(admin=False)
- self.assertEqual(updated_network['admin_state_up'], False)
+ updated_name = data_utils.rand_name(
+ self.__class__.__name__ + '-Network')
- # Revert back to True
- updated_network = self._update_network(admin=True)
- self.assertEqual(updated_network['admin_state_up'], True)
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self._update_network(name=updated_name)
@rbac_rule_validation.action(service="neutron",
rule="update_network:shared")
@@ -212,12 +173,8 @@
RBAC test for the neutron update_network:shared policy
"""
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- updated_network = self._update_network(shared_network=True)
- self.assertEqual(updated_network['shared'], True)
-
- # Revert back to False
- updated_network = self._update_network(shared_network=False)
- self.assertEqual(updated_network['shared'], False)
+ self._update_network(shared_network=True)
+ self.addCleanup(self._update_network, shared_network=False)
@rbac_rule_validation.action(service="neutron",
rule="update_network:router:external")
@@ -242,8 +199,7 @@
RBAC test for the neutron get_network policy
"""
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- # show a network that has been created during class setup
- self.networks_client.show_network(self.admin_network['id'])
+ self.networks_client.show_network(self.network['id'])
@rbac_rule_validation.action(service="neutron",
rule="get_network:router:external")
@@ -254,11 +210,11 @@
RBAC test for the neutron get_network:router:external policy
"""
- post_body = {'fields': 'router:external'}
+ kwargs = {'fields': 'router:external'}
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.networks_client.show_network(self.admin_network['id'],
- **post_body)
+ self.networks_client.show_network(self.network['id'],
+ **kwargs)
@rbac_rule_validation.action(service="neutron",
rule="get_network:provider:network_type")
@@ -269,14 +225,13 @@
RBAC test for the neutron get_network:provider:network_type policy
"""
- post_body = {'fields': 'provider:network_type'}
+ kwargs = {'fields': 'provider:network_type'}
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- body = self.networks_client.show_network(self.admin_network['id'],
- **post_body)
- showed_net = body['network']
+ retrieved_network = self.networks_client.show_network(
+ self.network['id'], **kwargs)['network']
- if len(showed_net) == 0:
+ if len(retrieved_network) == 0:
raise rbac_exceptions.RbacActionFailed
@rbac_rule_validation.action(service="neutron",
@@ -288,14 +243,13 @@
RBAC test for the neutron get_network:provider:physical_network policy
"""
- post_body = {'fields': 'provider:physical_network'}
+ kwargs = {'fields': 'provider:physical_network'}
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- body = self.networks_client.show_network(self.admin_network['id'],
- **post_body)
- showed_net = body['network']
+ retrieved_network = self.networks_client.show_network(
+ self.network['id'], **kwargs)['network']
- if len(showed_net) == 0:
+ if len(retrieved_network) == 0:
raise rbac_exceptions.RbacActionFailed
@rbac_rule_validation.action(service="neutron",
@@ -307,18 +261,17 @@
RBAC test for the neutron get_network:provider:segmentation_id policy
"""
- post_body = {'fields': 'provider:segmentation_id'}
+ kwargs = {'fields': 'provider:segmentation_id'}
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- body = self.networks_client.show_network(self.admin_network['id'],
- **post_body)
- showed_net = body['network']
+ retrieved_network = self.networks_client.show_network(
+ self.network['id'], **kwargs)['network']
- if len(showed_net) == 0:
+ if len(retrieved_network) == 0:
raise rbac_exceptions.RbacActionFailed
- key = showed_net.get('provider:segmentation_id', "NotFound")
- self.assertIsNot(key, "NotFound")
+ key = retrieved_network.get('provider:segmentation_id', "NotFound")
+ self.assertNotEqual(key, "NotFound")
@rbac_rule_validation.action(service="neutron",
rule="delete_network")
@@ -343,10 +296,8 @@
RBAC test for the neutron create_subnet policy
"""
network = self._create_network()
- self.assertEqual('ACTIVE', network['status'])
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- # Create a subnet
self.create_subnet(network, enable_dhcp=False)
@rbac_rule_validation.action(service="neutron",
@@ -359,7 +310,7 @@
RBAC test for the neutron get_subnet policy
"""
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.subnets_client.show_subnet(self.admin_subnet['id'])
+ self.subnets_client.show_subnet(self.subnet['id'])
@rbac_rule_validation.action(service="neutron",
rule="update_subnet")
@@ -370,9 +321,12 @@
RBAC test for the neutron update_subnet policy
"""
+ updated_name = data_utils.rand_name(
+ self.__class__.__name__ + '-Network')
+
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.subnets_client.update_subnet(self.admin_subnet['id'],
- name="New_subnet")
+ self.subnets_client.update_subnet(self.subnet['id'],
+ name=updated_name)
@rbac_rule_validation.action(service="neutron",
rule="delete_subnet")
@@ -383,13 +337,22 @@
RBAC test for the neutron delete_subnet policy
"""
- # Create a network using admin privilege
network = self._create_network()
- self.assertEqual('ACTIVE', network['status'])
-
- # Create a subnet using admin privilege
subnet = self.create_subnet(network, enable_dhcp=False)
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- # Delete the subnet
self.subnets_client.delete_subnet(subnet['id'])
+
+ @test.requires_ext(extension='dhcp_agent_scheduler', service='network')
+ @decorators.idempotent_id('b524f19f-fbb4-4d11-a85d-03bfae17bf0e')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_dhcp-agents")
+ def test_list_dhcp_agents_on_hosting_network(self):
+
+ """List DHCP Agents on Hosting Network Test
+
+ RBAC test for the neutron "get_dhcp-agents" policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.networks_client.list_dhcp_agents_on_hosting_network(
+ self.network['id'])
diff --git a/releasenotes/notes/dhcp-agent-scheduler-test-842fc1df45799def.yaml b/releasenotes/notes/dhcp-agent-scheduler-test-842fc1df45799def.yaml
new file mode 100644
index 0000000..1afe82b
--- /dev/null
+++ b/releasenotes/notes/dhcp-agent-scheduler-test-842fc1df45799def.yaml
@@ -0,0 +1,6 @@
+---
+features:
+ - |
+ Added RBAC network test for listing dhcp agents on a hosting
+ network, providing coverage for the following Neutron policy action:
+ * "get_dhcp-agents"