Merge "RBAC test for "dhcp_agent_scheduler" network policy"
diff --git a/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py b/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
index 8208fde..f72fe75 100644
--- a/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
@@ -17,8 +17,8 @@
 
 from tempest import config
 from tempest.lib.common.utils import data_utils
-from tempest.lib.common.utils import test_utils
 from tempest.lib import decorators
+from tempest import test
 
 from patrole_tempest_plugin import rbac_exceptions
 from patrole_tempest_plugin import rbac_rule_validation
@@ -27,68 +27,37 @@
 CONF = config.CONF
 
 
-class RbacNetworksTest(base.BaseNetworkRbacTest):
+class NetworksRbacTest(base.BaseNetworkRbacTest):
 
     @classmethod
     def resource_setup(cls):
-        super(RbacNetworksTest, cls).resource_setup()
-
-        network_name = data_utils.rand_name(
-            cls.__name__ + '-rbac-admin-network-')
-
-        post_body = {'name': network_name}
-        body = cls.networks_client.create_network(**post_body)
-        cls.admin_network = body['network']
-        cls.networks.append(cls.admin_network)
-
-        # Create a subnet by admin user
+        super(NetworksRbacTest, cls).resource_setup()
+        cls.network = cls.create_network()
         cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
-
-        cls.admin_subnet = cls.create_subnet(cls.admin_network,
-                                             cidr=cls.cidr,
-                                             mask_bits=24,
-                                             enable_dhcp=False)
-
-    def _delete_network(self, network):
-        # Deleting network also deletes its subnets if exists
-        self.networks_client.delete_network(network['id'])
-        if network in self.networks:
-            self.networks.remove(network)
-        for subnet in self.subnets:
-            if subnet['network_id'] == network['id']:
-                self.subnets.remove(subnet)
+        cls.subnet = cls.create_subnet(
+            cls.network, cidr=cls.cidr, mask_bits=24, enable_dhcp=False)
 
     def _create_network(self,
-                        shared=None,
                         router_external=None,
                         router_private=None,
                         provider_network_type=None,
                         provider_physical_network=None,
-                        provider_segmentation_id=None):
+                        provider_segmentation_id=None,
+                        **kwargs):
+        if router_external is not None:
+            kwargs['router:external'] = router_external
+        if router_private is not None:
+            kwargs['router:private'] = router_private
+        if provider_network_type is not None:
+            kwargs['provider:network_type'] = provider_network_type
+        if provider_physical_network is not None:
+            kwargs['provider:physical_network'] = provider_physical_network
+        if provider_segmentation_id is not None:
+            kwargs['provider:segmentation_id'] = provider_segmentation_id
 
         network_name = data_utils.rand_name(
-            self.__class__.__name__ + '-test-network-')
-        post_body = {'name': network_name}
-
-        if shared is not None:
-            post_body['shared'] = shared
-        if router_external is not None:
-            post_body['router:external'] = router_external
-        if router_private is not None:
-            post_body['router:private'] = router_private
-        if provider_network_type is not None:
-            post_body['provider:network_type'] = provider_network_type
-        if provider_physical_network is not None:
-            post_body['provider:physical_network'] = provider_physical_network
-        if provider_segmentation_id is not None:
-            post_body['provider:segmentation_id'] = provider_segmentation_id
-
-        body = self.networks_client.create_network(**post_body)
-        network = body['network']
-
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        self._delete_network, network)
-        self.assertEqual('ACTIVE', network['status'])
+            self.__class__.__name__ + '-Network')
+        network = self.create_network(network_name=network_name, **kwargs)
         return network
 
     def _update_network(self,
@@ -97,30 +66,24 @@
                         shared_network=None,
                         router_external=None,
                         router_private=None,
-                        segments=None):
-
-        # update a network that has been created during class setup
+                        segments=None,
+                        **kwargs):
         if not net_id:
-            net_id = self.admin_network['id']
-
-        post_body = {}
-        updated_network = None
+            net_id = self.network['id']
 
         if admin is not None:
-            post_body['admin_state_up'] = admin
+            kwargs['admin_state_up'] = admin
         elif shared_network is not None:
-            post_body['shared'] = shared_network
+            kwargs['shared'] = shared_network
         elif router_external is not None:
-            post_body['router:external'] = router_external
+            kwargs['router:external'] = router_external
         elif router_private is not None:
-            post_body['router:private'] = router_private
+            kwargs['router:private'] = router_private
         elif segments is not None:
-            post_body['segments'] = segments
-        else:
-            return updated_network
+            kwargs['segments'] = segments
 
-        body = self.networks_client.update_network(net_id, **post_body)
-        updated_network = body['network']
+        updated_network = self.networks_client.update_network(
+            net_id, **kwargs)['network']
         return updated_network
 
     @rbac_rule_validation.action(service="neutron",
@@ -194,13 +157,11 @@
 
         RBAC test for the neutron update_network policy
         """
-        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        updated_network = self._update_network(admin=False)
-        self.assertEqual(updated_network['admin_state_up'], False)
+        updated_name = data_utils.rand_name(
+            self.__class__.__name__ + '-Network')
 
-        # Revert back to True
-        updated_network = self._update_network(admin=True)
-        self.assertEqual(updated_network['admin_state_up'], True)
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self._update_network(name=updated_name)
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="update_network:shared")
@@ -212,12 +173,8 @@
         RBAC test for the neutron update_network:shared policy
         """
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        updated_network = self._update_network(shared_network=True)
-        self.assertEqual(updated_network['shared'], True)
-
-        # Revert back to False
-        updated_network = self._update_network(shared_network=False)
-        self.assertEqual(updated_network['shared'], False)
+        self._update_network(shared_network=True)
+        self.addCleanup(self._update_network, shared_network=False)
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="update_network:router:external")
@@ -242,8 +199,7 @@
         RBAC test for the neutron get_network policy
         """
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        # show a network that has been created during class setup
-        self.networks_client.show_network(self.admin_network['id'])
+        self.networks_client.show_network(self.network['id'])
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="get_network:router:external")
@@ -254,11 +210,11 @@
 
         RBAC test for the neutron get_network:router:external policy
         """
-        post_body = {'fields': 'router:external'}
+        kwargs = {'fields': 'router:external'}
 
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        self.networks_client.show_network(self.admin_network['id'],
-                                          **post_body)
+        self.networks_client.show_network(self.network['id'],
+                                          **kwargs)
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="get_network:provider:network_type")
@@ -269,14 +225,13 @@
 
         RBAC test for the neutron get_network:provider:network_type policy
         """
-        post_body = {'fields': 'provider:network_type'}
+        kwargs = {'fields': 'provider:network_type'}
 
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        body = self.networks_client.show_network(self.admin_network['id'],
-                                                 **post_body)
-        showed_net = body['network']
+        retrieved_network = self.networks_client.show_network(
+            self.network['id'], **kwargs)['network']
 
-        if len(showed_net) == 0:
+        if len(retrieved_network) == 0:
             raise rbac_exceptions.RbacActionFailed
 
     @rbac_rule_validation.action(service="neutron",
@@ -288,14 +243,13 @@
 
         RBAC test for the neutron get_network:provider:physical_network policy
         """
-        post_body = {'fields': 'provider:physical_network'}
+        kwargs = {'fields': 'provider:physical_network'}
 
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        body = self.networks_client.show_network(self.admin_network['id'],
-                                                 **post_body)
-        showed_net = body['network']
+        retrieved_network = self.networks_client.show_network(
+            self.network['id'], **kwargs)['network']
 
-        if len(showed_net) == 0:
+        if len(retrieved_network) == 0:
             raise rbac_exceptions.RbacActionFailed
 
     @rbac_rule_validation.action(service="neutron",
@@ -307,18 +261,17 @@
 
         RBAC test for the neutron get_network:provider:segmentation_id policy
         """
-        post_body = {'fields': 'provider:segmentation_id'}
+        kwargs = {'fields': 'provider:segmentation_id'}
 
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        body = self.networks_client.show_network(self.admin_network['id'],
-                                                 **post_body)
-        showed_net = body['network']
+        retrieved_network = self.networks_client.show_network(
+            self.network['id'], **kwargs)['network']
 
-        if len(showed_net) == 0:
+        if len(retrieved_network) == 0:
             raise rbac_exceptions.RbacActionFailed
 
-        key = showed_net.get('provider:segmentation_id', "NotFound")
-        self.assertIsNot(key, "NotFound")
+        key = retrieved_network.get('provider:segmentation_id', "NotFound")
+        self.assertNotEqual(key, "NotFound")
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="delete_network")
@@ -343,10 +296,8 @@
         RBAC test for the neutron create_subnet policy
         """
         network = self._create_network()
-        self.assertEqual('ACTIVE', network['status'])
 
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        # Create a subnet
         self.create_subnet(network, enable_dhcp=False)
 
     @rbac_rule_validation.action(service="neutron",
@@ -359,7 +310,7 @@
         RBAC test for the neutron get_subnet policy
         """
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        self.subnets_client.show_subnet(self.admin_subnet['id'])
+        self.subnets_client.show_subnet(self.subnet['id'])
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="update_subnet")
@@ -370,9 +321,12 @@
 
         RBAC test for the neutron update_subnet policy
         """
+        updated_name = data_utils.rand_name(
+            self.__class__.__name__ + '-Network')
+
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        self.subnets_client.update_subnet(self.admin_subnet['id'],
-                                          name="New_subnet")
+        self.subnets_client.update_subnet(self.subnet['id'],
+                                          name=updated_name)
 
     @rbac_rule_validation.action(service="neutron",
                                  rule="delete_subnet")
@@ -383,13 +337,22 @@
 
         RBAC test for the neutron delete_subnet policy
         """
-        # Create a network using admin privilege
         network = self._create_network()
-        self.assertEqual('ACTIVE', network['status'])
-
-        # Create a subnet using admin privilege
         subnet = self.create_subnet(network, enable_dhcp=False)
 
         self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        # Delete the subnet
         self.subnets_client.delete_subnet(subnet['id'])
+
+    @test.requires_ext(extension='dhcp_agent_scheduler', service='network')
+    @decorators.idempotent_id('b524f19f-fbb4-4d11-a85d-03bfae17bf0e')
+    @rbac_rule_validation.action(service="neutron",
+                                 rule="get_dhcp-agents")
+    def test_list_dhcp_agents_on_hosting_network(self):
+
+        """List DHCP Agents on Hosting Network Test
+
+        RBAC test for the neutron "get_dhcp-agents" policy
+        """
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self.networks_client.list_dhcp_agents_on_hosting_network(
+            self.network['id'])
diff --git a/releasenotes/notes/dhcp-agent-scheduler-test-842fc1df45799def.yaml b/releasenotes/notes/dhcp-agent-scheduler-test-842fc1df45799def.yaml
new file mode 100644
index 0000000..1afe82b
--- /dev/null
+++ b/releasenotes/notes/dhcp-agent-scheduler-test-842fc1df45799def.yaml
@@ -0,0 +1,6 @@
+---
+features:
+  - |
+    Added RBAC network test for listing dhcp agents on a hosting
+    network, providing coverage for the following Neutron policy action:
+    * "get_dhcp-agents"