Merge "Refactors exceptions in rbac_rule_validation decorator."
diff --git a/contrib/post_test_hook.sh b/contrib/post_test_hook.sh
index 34555fb..aeb4013 100644
--- a/contrib/post_test_hook.sh
+++ b/contrib/post_test_hook.sh
@@ -30,7 +30,8 @@
# tests that contain the @test.attr(type='slow') decorator above them. Slower
# tests will execute those tests in a separate gate, which will require
# future modification of this script.
-DEVSTACK_GATE_TEMPEST_REGEX="(?!.*\[.*\bslow\b.*\])(^patrole_tempest_plugin\.tests\.api)"
+DEVSTACK_FAST_GATE_TEMPEST_REGEX="(?!.*\[.*\bslow\b.*\])(^patrole_tempest_plugin\.tests\.api)"
+DEVSTACK_SLOW_GATE_TEMPEST_REGEX="(?=.*\[.*\bslow\b.*\])(^patrole_tempest_plugin\.tests\.api)"
# Import devstack function 'iniset'.
source $BASE/new/devstack/functions
@@ -48,6 +49,10 @@
RBAC_ROLE="Member"
fi
+# Second argument is expected to contain value to indicate whether it is
+# a "fast' or "slow test" gate
+TYPE=$2
+
# Set rbac_flag=True under [rbac] section in tempest.conf
iniset $TEMPEST_CONFIG rbac rbac_flag True
# Set rbac_test_role=$RBAC_ROLE under [rbac] section in tempest.conf
@@ -66,5 +71,11 @@
# cd into Tempest directory before executing tox.
cd $BASE/new/tempest
-$TEMPEST_COMMAND -eall-plugin -- $DEVSTACK_GATE_TEMPEST_REGEX --concurrency=$TEMPEST_CONCURRENCY
+# Select Fast Gate if Type is set to 'fast', else use 'slow' gate
+if [[ "$TYPE" == "fast" ]]; then
+ $TEMPEST_COMMAND -eall-plugin -- $DEVSTACK_FAST_GATE_TEMPEST_REGEX --concurrency=$TEMPEST_CONCURRENCY
+else
+ $TEMPEST_COMMAND -eall-plugin -- $DEVSTACK_SLOW_GATE_TEMPEST_REGEX --concurrency=$TEMPEST_CONCURRENCY
+fi
+
sudo -H -u tempest .tox/all-plugin/bin/tempest list-plugins
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_rbac.py
index 66676d1..e5a53c9 100644
--- a/patrole_tempest_plugin/tests/api/compute/test_server_rbac.py
+++ b/patrole_tempest_plugin/tests/api/compute/test_server_rbac.py
@@ -16,8 +16,10 @@
from oslo_log import log
from tempest.common import waiters
+from tempest import config
from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
@@ -25,6 +27,7 @@
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.compute import rbac_base as base
+CONF = config.CONF
LOG = log.getLogger(__name__)
@@ -34,6 +37,73 @@
def setup_clients(cls):
super(ComputeServersRbacTest, cls).setup_clients()
cls.client = cls.servers_client
+ cls.networks_client = cls.os.networks_client
+ cls.subnets_client = cls.os.subnets_client
+
+ @classmethod
+ def resource_setup(cls):
+ super(ComputeServersRbacTest, cls).resource_setup()
+
+ # Create a volume
+ volume_name = data_utils.rand_name(cls.__name__ + '-volume')
+ name_field = 'name'
+ if not CONF.volume_feature_enabled.api_v2:
+ name_field = 'display_name'
+
+ params = {name_field: volume_name,
+ 'imageRef': CONF.compute.image_ref,
+ 'size': CONF.volume.volume_size}
+
+ volume = cls.volumes_client.create_volume(**params)['volume']
+ waiters.wait_for_volume_resource_status(cls.volumes_client,
+ volume['id'], 'available')
+ cls.volumes.append(volume)
+ cls.volume_id = volume['id']
+
+ def _create_network_resources(self):
+ # Create network
+ network_name = data_utils.rand_name(
+ self.__class__.__name__ + '-network')
+
+ network = self.networks_client.create_network(
+ **{'name': network_name})['network']
+
+ # Create subnet for the network
+ subnet_name = data_utils.rand_name(self.__class__.__name__ + '-subnet')
+ subnet = self.subnets_client.create_subnet(
+ name=subnet_name,
+ network_id=network['id'],
+ cidr=CONF.network.project_network_cidr,
+ ip_version=4)['subnet']
+
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.networks_client.delete_network, network['id'])
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.subnets_client.delete_subnet, subnet['id'])
+
+ return network
+
+ def _create_test_server_with_volume(self, volume_id):
+ # Create a server with the volume created earlier
+ server_name = data_utils.rand_name(self.__class__.__name__ + "-server")
+ bd_map_v2 = [{'uuid': volume_id,
+ 'source_type': 'volume',
+ 'destination_type': 'volume',
+ 'boot_index': 0,
+ 'delete_on_termination': True}]
+ device_mapping = {'block_device_mapping_v2': bd_map_v2}
+
+ # Since the server is booted from volume, the imageRef does not need
+ # to be specified.
+ server = self.client.create_server(name=server_name,
+ imageRef='',
+ flavorRef=CONF.compute.flavor_ref,
+ **device_mapping)['server']
+
+ waiters.wait_for_server_status(self.client, server['id'], 'ACTIVE')
+
+ self.servers.append(server)
+ return server
@rbac_rule_validation.action(
service="nova",
@@ -45,6 +115,51 @@
@rbac_rule_validation.action(
service="nova",
+ rule="os_compute_api:servers:create:forced_host")
+ @decorators.idempotent_id('0ae3c401-52ab-41bc-ab96-c598a65d9ae5')
+ def test_create_server_forced_host(self):
+ # Retrieve 'nova' zone host information from availiability_zone_list
+ zones = self.availability_zone_client.list_availability_zones(
+ detail=True)['availabilityZoneInfo']
+ hosts = [zone['hosts'] for zone in zones if zone['zoneName'] == 'nova']
+
+ # We just need any host out of the hosts list to build the
+ # availability_zone attribute. So, picking the first one is fine.
+ # The first key of the dictionary specifies the host name.
+ host = hosts[0].keys()[0]
+ availability_zone = 'nova:' + host
+ self.rbac_utils.switch_role(self, switchToRbacRole=True)
+ self.create_test_server(wait_until='ACTIVE',
+ availability_zone=availability_zone)
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:servers:create:attach_volume")
+ @decorators.idempotent_id('eeddac5e-15aa-454f-838d-db608aae4dd8')
+ def test_create_server_attach_volume(self):
+ self.rbac_utils.switch_role(self, switchToRbacRole=True)
+ self._create_test_server_with_volume(self.volume_id)
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:servers:create:attach_network")
+ @decorators.idempotent_id('b44cd4ff-50a4-42ce-ada3-724e213cd540')
+ def test_create_server_attach_network(self):
+ network = self._create_network_resources()
+ network_id = {'uuid': network['id']}
+ self.rbac_utils.switch_role(self, switchToRbacRole=True)
+ server = self.create_test_server(wait_until='ACTIVE',
+ networks=[network_id])
+
+ # The network resources created is for this test case only. We will
+ # clean them up after this test case. In order to do that,
+ # we need to clean up the server first.
+ self.addCleanup(waiters.wait_for_server_termination,
+ self.client, server['id'])
+ self.addCleanup(self.client.delete_server, server['id'])
+
+ @rbac_rule_validation.action(
+ service="nova",
rule="os_compute_api:servers:delete")
@decorators.idempotent_id('062e3440-e873-4b41-9317-bf6d8be50c12')
def test_delete_server(self):
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volumes_snapshots_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volumes_snapshots_rbac.py
index 83871cb..9374cd9 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volumes_snapshots_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volumes_snapshots_rbac.py
@@ -39,8 +39,6 @@
def resource_setup(cls):
super(VolumesSnapshotRbacTest, cls).resource_setup()
# Create a test shared volume for tests
- cls.name_field = cls.special_fields['name_field']
- cls.descrip_field = cls.special_fields['descrip_field']
cls.volume = cls.create_volume()
# Create a test shared snapshot for tests
cls.snapshot = cls.create_snapshot(cls.volume['id'])
@@ -48,7 +46,6 @@
def _list_by_param_values(self, params, with_detail=False):
# Perform list or list_details action with given params
# and validates result.
-
if with_detail:
self.snapshots_client.list_snapshots(
detail=True, params=params)['snapshots']
@@ -78,7 +75,7 @@
@decorators.idempotent_id('53fe8ee3-3bea-4ae8-a979-3c98ea72f620')
def test_snapshot_update(self):
new_desc = 'This is the new description of snapshot.'
- params = {self.descrip_field: new_desc}
+ params = {'description': new_desc}
# Updates snapshot with new values
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.client.update_snapshot(
@@ -90,7 +87,7 @@
def test_snapshots_get_all(self):
"""list snapshots with params."""
# Verify list snapshots by display_name filter
- params = {self.name_field: self.snapshot[self.name_field]}
+ params = {'name': self.snapshot['name']}
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self._list_by_param_values(params)