Merge "Compute API Compute Flavor Rxtx Test."
diff --git a/contrib/post_test_hook.sh b/contrib/post_test_hook.sh
index aeb4013..2eb494f 100644
--- a/contrib/post_test_hook.sh
+++ b/contrib/post_test_hook.sh
@@ -53,10 +53,12 @@
# a "fast' or "slow test" gate
TYPE=$2
-# Set rbac_flag=True under [rbac] section in tempest.conf
-iniset $TEMPEST_CONFIG rbac rbac_flag True
+# Set enable_rbac=True under [rbac] section in tempest.conf
+iniset $TEMPEST_CONFIG rbac enable_rbac True
# Set rbac_test_role=$RBAC_ROLE under [rbac] section in tempest.conf
iniset $TEMPEST_CONFIG rbac rbac_test_role $RBAC_ROLE
+# Set strict_policy_check=False under [rbac] section in tempest.conf
+iniset $TEMPEST_CONFIG rbac strict_policy_check False
# Set additional, necessary CONF values
iniset $TEMPEST_CONFIG auth use_dynamic_credentials True
iniset $TEMPEST_CONFIG auth tempest_roles Member
diff --git a/doc/source/installation.rst b/doc/source/installation.rst
index 00cc57b..96f1c3f 100644
--- a/doc/source/installation.rst
+++ b/doc/source/installation.rst
@@ -48,9 +48,14 @@
#. [rbac] section updates ::
# The role that you want the RBAC tests to use for RBAC testing
- # This needs to be edited to run the test as a different role.
+ # This needs to be edited to run the test as a different role.
rbac_test_role = _member_
# Enables RBAC Tempest tests if set to True. Otherwise, they are
# skipped.
- rbac_flag = True
+ enable_rbac = True
+
+ # If set to true, tests throw a RbacParsingException for policies
+ # not found in the policy.json. Otherwise, they throw a
+ # skipException.
+ strict_policy_check = False
diff --git a/patrole_tempest_plugin/config.py b/patrole_tempest_plugin/config.py
index 1edf877..dfb6cef 100644
--- a/patrole_tempest_plugin/config.py
+++ b/patrole_tempest_plugin/config.py
@@ -23,7 +23,12 @@
default='admin',
help="The current RBAC role against which to run"
" Patrole tests."),
- cfg.BoolOpt('rbac_flag',
+ cfg.BoolOpt('enable_rbac',
default=True,
- help="Enables RBAC tests.")
+ help="Enables RBAC tests."),
+ cfg.BoolOpt('strict_policy_check',
+ default=False,
+ help="If true, throws RbacParsingException for"
+ " policies which don't exist. If false, "
+ "throws skipException.")
]
diff --git a/patrole_tempest_plugin/rbac_auth.py b/patrole_tempest_plugin/rbac_auth.py
index 687c0a8..7281969 100644
--- a/patrole_tempest_plugin/rbac_auth.py
+++ b/patrole_tempest_plugin/rbac_auth.py
@@ -17,10 +17,13 @@
from oslo_log import log as logging
+from tempest import config
+
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_policy_parser
LOG = logging.getLogger(__name__)
+CONF = config.CONF
class RbacAuthority(object):
@@ -39,5 +42,8 @@
rule_name, role)
return is_allowed
except rbac_exceptions.RbacParsingException as e:
- raise testtools.TestCase.skipException(str(e))
+ if CONF.rbac.strict_policy_check:
+ raise e
+ else:
+ raise testtools.TestCase.skipException(str(e))
return False
diff --git a/patrole_tempest_plugin/rbac_policy_parser.py b/patrole_tempest_plugin/rbac_policy_parser.py
index 62113ac..9d10a36 100644
--- a/patrole_tempest_plugin/rbac_policy_parser.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -104,7 +104,6 @@
access=self._get_access_token(role),
apply_rule=rule_name,
is_admin=is_admin_context)
-
return is_allowed
def _is_admin_context(self, role):
@@ -168,16 +167,11 @@
return result
def _try_rule(self, apply_rule, target, access_data, o):
- try:
- rule = self.rules[apply_rule]
- return rule(target, access_data, o)
- except KeyError as e:
+ if apply_rule not in self.rules:
message = "Policy action: {0} not found in policy file: {1}."\
.format(apply_rule, self.path)
LOG.debug(message)
raise rbac_exceptions.RbacParsingException(message)
- except Exception as e:
- message = "Unknown exception: {0} for policy action: {1} in "\
- "policy file: {2}.".format(e, apply_rule, self.path)
- LOG.debug(message)
- raise rbac_exceptions.RbacParsingException(message)
+ else:
+ rule = self.rules[apply_rule]
+ return rule(target, access_data, o)
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 5e1e816..9b959d7 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -14,6 +14,9 @@
# under the License.
import logging
+import sys
+
+import six
from tempest import config
from tempest.lib import exceptions
@@ -93,7 +96,9 @@
raise exceptions.NotFound(
"%s RbacInvalidService was: %s" %
(msg, e))
- except expected_exception as e:
+ except (expected_exception, rbac_exceptions.RbacActionFailed) as e:
+ if irregular_msg:
+ LOG.warning(irregular_msg.format(rule, service))
if allowed:
msg = ("Role %s was not allowed to perform %s." %
(CONF.rbac.rbac_test_role, rule))
@@ -101,16 +106,14 @@
raise exceptions.Forbidden(
"%s exception was: %s" %
(msg, e))
- if irregular_msg:
- LOG.warning(irregular_msg.format(rule, service))
- except rbac_exceptions.RbacActionFailed as e:
- if allowed:
- msg = ("Role %s was not allowed to perform %s." %
- (CONF.rbac.rbac_test_role, rule))
- LOG.error(msg)
- raise exceptions.Forbidden(
- "%s RbacActionFailed was: %s" %
- (msg, e))
+ except Exception as e:
+ exc_info = sys.exc_info()
+ error_details = exc_info[1].__str__()
+ msg = ("%s An unexpected exception has occurred: Expected "
+ "exception was %s, which was not thrown."
+ % (error_details, expected_exception.__name__))
+ LOG.error(msg)
+ six.reraise(exc_info[0], exc_info[0](msg), exc_info[2])
else:
if not allowed:
LOG.error("Role %s was allowed to perform %s" %
diff --git a/patrole_tempest_plugin/tests/api/compute/rbac_base.py b/patrole_tempest_plugin/tests/api/compute/rbac_base.py
index 625b5cf..e5d7d9c 100644
--- a/patrole_tempest_plugin/tests/api/compute/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/compute/rbac_base.py
@@ -29,7 +29,7 @@
@classmethod
def skip_checks(cls):
super(BaseV2ComputeRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
'%s skipped as RBAC flag not enabled' % cls.__name__)
@@ -48,7 +48,7 @@
@classmethod
def skip_checks(cls):
super(BaseV2ComputeAdminRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
'%s skipped as RBAC flag not enabled' % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_migrations_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_migrations_rbac.py
index 39afb4a..215ce65 100644
--- a/patrole_tempest_plugin/tests/api/compute/test_server_migrations_rbac.py
+++ b/patrole_tempest_plugin/tests/api/compute/test_server_migrations_rbac.py
@@ -17,8 +17,8 @@
from tempest.common import waiters
from tempest import config
-
from tempest.lib import decorators
+from tempest import test
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.compute import rbac_base as base
@@ -67,6 +67,7 @@
server_id, host=dest_host, block_migration=block_migration,
**kwargs)
+ @test.attr(type='slow')
@testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,
'Cold migration not available.')
@rbac_rule_validation.action(
@@ -84,6 +85,7 @@
waiters.wait_for_server_status(self.admin_servers_client,
server['id'], 'VERIFY_RESIZE')
+ @test.attr(type='slow')
@testtools.skipUnless(CONF.compute_feature_enabled.live_migration,
'Live migration feature is not enabled.')
@rbac_rule_validation.action(
diff --git a/patrole_tempest_plugin/tests/api/identity/v2/rbac_base.py b/patrole_tempest_plugin/tests/api/identity/v2/rbac_base.py
index ffee5c0..969631d 100644
--- a/patrole_tempest_plugin/tests/api/identity/v2/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/identity/v2/rbac_base.py
@@ -30,7 +30,7 @@
@classmethod
def skip_checks(cls):
super(BaseIdentityV2AdminRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py b/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
index 47f6590..127ed3e 100644
--- a/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
@@ -30,7 +30,7 @@
@classmethod
def skip_checks(cls):
super(BaseIdentityV3RbacAdminTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/image/rbac_base.py b/patrole_tempest_plugin/tests/api/image/rbac_base.py
index b4fed7d..25521f3 100644
--- a/patrole_tempest_plugin/tests/api/image/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/image/rbac_base.py
@@ -26,7 +26,7 @@
@classmethod
def skip_checks(cls):
super(BaseV1ImageRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
@@ -45,7 +45,7 @@
@classmethod
def skip_checks(cls):
super(BaseV2ImageRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/network/rbac_base.py b/patrole_tempest_plugin/tests/api/network/rbac_base.py
index de5ed88..cad5498 100644
--- a/patrole_tempest_plugin/tests/api/network/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/network/rbac_base.py
@@ -28,7 +28,7 @@
@classmethod
def skip_checks(cls):
super(BaseNetworkRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py b/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
index 80f6270..b59e1e5 100644
--- a/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
@@ -15,9 +15,10 @@
#
import netaddr
-import random
from oslo_log import log
+
+from tempest.common.utils import net_utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
@@ -92,14 +93,13 @@
@decorators.idempotent_id('2551e10d-006a-413c-925a-8c6f834c09ac')
def test_create_port_fixed_ips(self):
- # Pick an unused ip address to avoid IpAddressAlreadyAllocated
- # exception.
- current_ports = self.ports_client.list_ports()['ports']
- in_use_ips = [p['fixed_ips'][0]['ip_address'] for p in current_ports]
- unused_ip_range = list(set(self.ip_range) - set(in_use_ips))
- ip_address = random.choice(unused_ip_range)
-
- fixed_ips = [{'ip_address': ip_address},
+ # Pick an unused ip address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ fixed_ips = [{'ip_address': ip_list[0]},
{'subnet_id': self.subnet['id']}]
post_body = {'network_id': self.network['id'],
@@ -259,16 +259,17 @@
post_body = {'network_id': self.network['id']}
port = self._create_port(**post_body)
- # Pick an unused ip address to avoid IpAddressAlreadyAllocated
- # exception.
- current_ports = self.ports_client.list_ports()['ports']
- in_use_ips = [p['fixed_ips'][0]['ip_address'] for p in current_ports]
- unused_ip_range = list(set(self.ip_range) - set(in_use_ips))
- ip_address = random.choice(unused_ip_range)
+ # Pick an unused ip address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ fixed_ips = [{'ip_address': ip_list[0]}]
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.ports_client.update_port(port['id'],
- fixed_ips=[{'ip_address': ip_address}])
+ fixed_ips=fixed_ips)
@rbac_rule_validation.action(service="neutron",
rule="update_port:port_security_enabled")
@@ -318,9 +319,14 @@
@decorators.idempotent_id('729c2151-bb49-4f4f-9d58-3ed8819b7582')
def test_update_port_allowed_address_pairs(self):
- ip_address = random.choice(list(self.ip_range))
+ # Pick an unused ip address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
# Update allowed address pair attribute of port
- address_pairs = [{'ip_address': ip_address,
+ address_pairs = [{'ip_address': ip_list[0],
'mac_address': data_utils.rand_mac_address()}]
post_body = {'network_id': self.network['id']}
port = self._create_port(**post_body)
diff --git a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
index 69aa32a..eee9ff2 100644
--- a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
@@ -14,9 +14,10 @@
# under the License.
import netaddr
-import random
from oslo_log import log
+
+from tempest.common.utils import net_utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
@@ -43,12 +44,12 @@
super(RouterRbacTest, cls).resource_setup()
post_body = {}
post_body['router:external'] = True
- cls.admin_network = cls.create_network(**post_body)
- cls.admin_subnet = cls.create_subnet(cls.admin_network)
- cls.admin_ip_range = netaddr.IPRange(
- cls.admin_subnet['allocation_pools'][0]['start'],
- cls.admin_subnet['allocation_pools'][0]['end'])
- cls.admin_router = cls.create_router()
+ cls.network = cls.create_network(**post_body)
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.ip_range = netaddr.IPRange(
+ cls.subnet['allocation_pools'][0]['start'],
+ cls.subnet['allocation_pools'][0]['end'])
+ cls.router = cls.create_router()
@rbac_rule_validation.action(service="neutron",
rule="create_router")
@@ -74,7 +75,7 @@
create_router:external_gateway_info:enable_snat policy
"""
name = data_utils.rand_name('snat-router')
- external_gateway_info = {'network_id': self.admin_network['id'],
+ external_gateway_info = {'network_id': self.network['id'],
'enable_snat': True}
self.rbac_utils.switch_role(self, switchToRbacRole=True)
@@ -95,12 +96,15 @@
"""
name = data_utils.rand_name('snat-router')
- # Pick an ip address within the allocation_pools range
- ip_address = random.choice(list(self.admin_ip_range))
- external_fixed_ips = {'subnet_id': self.admin_subnet['id'],
- 'ip_address': ip_address}
-
- external_gateway_info = {'network_id': self.admin_network['id'],
+ # Pick an unused IP address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ external_fixed_ips = {'subnet_id': self.subnet['id'],
+ 'ip_address': ip_list[0]}
+ external_gateway_info = {'network_id': self.network['id'],
'enable_snat': False,
'external_fixed_ips': [external_fixed_ips]}
@@ -120,7 +124,7 @@
RBAC test for the neutron get_router policy
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
- self.routers_client.show_router(self.admin_router['id'])
+ self.routers_client.show_router(self.router['id'])
@rbac_rule_validation.action(
service="neutron", rule="update_router")
@@ -132,7 +136,7 @@
"""
new_name = data_utils.rand_name('new-router-name')
self.rbac_utils.switch_role(self, switchToRbacRole=True)
- self.routers_client.update_router(self.admin_router['id'],
+ self.routers_client.update_router(self.router['id'],
name=new_name)
@rbac_rule_validation.action(
@@ -145,7 +149,7 @@
update_router:external_gateway_info policy
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
- self.routers_client.update_router(self.admin_router['id'],
+ self.routers_client.update_router(self.router['id'],
external_gateway_info={})
@rbac_rule_validation.action(
@@ -160,11 +164,11 @@
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.routers_client.update_router(
- self.admin_router['id'],
- external_gateway_info={'network_id': self.admin_network['id']})
+ self.router['id'],
+ external_gateway_info={'network_id': self.network['id']})
self.addCleanup(
self.routers_client.update_router,
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=None)
@rbac_rule_validation.action(
@@ -179,12 +183,12 @@
"""
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.routers_client.update_router(
- self.admin_router['id'],
- external_gateway_info={'network_id': self.admin_network['id'],
+ self.router['id'],
+ external_gateway_info={'network_id': self.network['id'],
'enable_snat': True})
self.addCleanup(
self.routers_client.update_router,
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=None)
@rbac_rule_validation.action(
@@ -197,20 +201,24 @@
RBAC test for the neutron
update_router:external_gateway_info:external_fixed_ips policy
"""
- # Pick an ip address within the allocation_pools range
- ip_address = random.choice(list(self.admin_ip_range))
- external_fixed_ips = {'subnet_id': self.admin_subnet['id'],
- 'ip_address': ip_address}
- external_gateway_info = {'network_id': self.admin_network['id'],
+ # Pick an unused IP address.
+ ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ external_fixed_ips = {'subnet_id': self.subnet['id'],
+ 'ip_address': ip_list[0]}
+ external_gateway_info = {'network_id': self.network['id'],
'external_fixed_ips': [external_fixed_ips]}
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.routers_client.update_router(
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=external_gateway_info)
self.addCleanup(
self.routers_client.update_router,
- self.admin_router['id'],
+ self.router['id'],
external_gateway_info=None)
@rbac_rule_validation.action(service="neutron",
diff --git a/patrole_tempest_plugin/tests/api/orchestration/rbac_base.py b/patrole_tempest_plugin/tests/api/orchestration/rbac_base.py
index 2cc7978..734007f 100644
--- a/patrole_tempest_plugin/tests/api/orchestration/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/orchestration/rbac_base.py
@@ -26,7 +26,7 @@
@classmethod
def skip_checks(cls):
super(BaseOrchestrationRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/volume/rbac_base.py b/patrole_tempest_plugin/tests/api/volume/rbac_base.py
index 2a14b19..3c7cd68 100644
--- a/patrole_tempest_plugin/tests/api/volume/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/volume/rbac_base.py
@@ -26,7 +26,7 @@
@classmethod
def skip_checks(cls):
super(BaseVolumeRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
@@ -45,7 +45,7 @@
@classmethod
def skip_checks(cls):
super(BaseVolumeAdminRbacTest, cls).skip_checks()
- if not CONF.rbac.rbac_flag:
+ if not CONF.rbac.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC Flag not enabled" % cls.__name__)
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py
index 29f6a80..5c181f9 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py
@@ -21,6 +21,7 @@
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
+from tempest import test
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.volume import rbac_base
@@ -69,6 +70,7 @@
# Attach the volume
self._attach_volume(server)
+ @test.attr(type="slow")
@rbac_rule_validation.action(service="cinder", rule="volume:detach")
@decorators.idempotent_id('5a042f6a-688b-42e6-a02e-fe5c47b89b07')
def test_detach_volume_to_instance(self):
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py
index 36ee02e..20991fe 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py
@@ -18,6 +18,7 @@
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
+from tempest import test
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.volume import rbac_base
@@ -49,6 +50,7 @@
self.backups_client, backup['id'], 'available')
return backup
+ @test.attr(type="slow")
@rbac_rule_validation.action(service="cinder",
rule="backup:create")
@decorators.idempotent_id('6887ec94-0bcf-4ab7-b30f-3808a4b5a2a5')
@@ -56,6 +58,7 @@
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self._create_backup(volume_id=self.volume['id'])
+ @test.attr(type="slow")
@rbac_rule_validation.action(service="cinder",
rule="backup:get")
@decorators.idempotent_id('abd92bdd-b0fb-4dc4-9cfc-de9e968f8c8a')
@@ -73,6 +76,7 @@
self.rbac_utils.switch_role(self, switchToRbacRole=True)
self.backups_client.list_backups()
+ @test.attr(type="slow")
@rbac_rule_validation.action(service="cinder",
rule="backup:restore")
@decorators.idempotent_id('9c794bf9-2446-4f41-8fe0-80b71e757f9d')
@@ -85,6 +89,7 @@
waiters.wait_for_volume_resource_status(
self.backups_client, restore['backup_id'], 'available')
+ @test.attr(type="slow")
@rbac_rule_validation.action(service="cinder",
rule="backup:delete")
@decorators.idempotent_id('d5d0c6a2-413d-437e-a73f-4bf2b41a20ed')
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
index 8583eb5..d5f20f6 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
@@ -286,9 +286,8 @@
**{'__getitem__.return_value.side_effect': Exception(
mock.sentinel.error)})
- expected_message = "Unknown exception: {0} for policy action: {1} in "\
- "policy file: {2}.".format(mock.sentinel.error,
- mock.sentinel.rule,
+ expected_message = "Policy action: {0} not found in "\
+ "policy file: {1}.".format(mock.sentinel.rule,
self.custom_policy_file)
e = self.assertRaises(rbac_exceptions.RbacParsingException,
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
index 705c7e7..38b5fea 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
@@ -13,17 +13,16 @@
# under the License.
import mock
-import testtools
-
-from patrole_tempest_plugin import rbac_auth
-from patrole_tempest_plugin import rbac_exceptions
-from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
from tempest import config
from tempest.lib import exceptions
from tempest import test
from tempest.tests import base
+from patrole_tempest_plugin import rbac_auth
+from patrole_tempest_plugin import rbac_exceptions
+from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
+
CONF = config.CONF
@@ -43,22 +42,70 @@
enforce_type=True)
self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_RBAC_rv_happy_path(self, mock_auth):
- decorator = rbac_rv.action("", "")
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_rule_validation_have_permission_no_exc(self, mock_auth, mock_log):
+ """Test that having permission and no exception thrown is success.
+
+ Positive test case success scenario.
+ """
+ decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
+
mock_function = mock.Mock()
wrapper = decorator(mock_function)
- wrapper((self.mock_args))
- self.assertTrue(mock_function.called)
+
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = True
+ mock_auth.return_value = mock_permission
+
+ result = wrapper(self.mock_args)
+
+ self.assertIsNone(result)
+ mock_log.warning.assert_not_called()
+ mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_RBAC_rv_forbidden(self, mock_auth, mock_log):
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_rule_validation_lack_permission_throw_exc(self, mock_auth,
+ mock_log):
+ """Test that having no permission and exception thrown is success.
+
+ Negative test case success scenario.
+ """
+ decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
+
+ mock_function = mock.Mock()
+ mock_function.side_effect = exceptions.Forbidden
+ wrapper = decorator(mock_function)
+
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = False
+ mock_auth.return_value = mock_permission
+
+ result = wrapper(self.mock_args)
+
+ self.assertIsNone(result)
+ mock_log.warning.assert_not_called()
+ mock_log.error.assert_not_called()
+
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_rule_validation_forbidden_negative(self, mock_auth, mock_log):
+ """Test Forbidden error is thrown and have permission fails.
+
+ Negative test case: if Forbidden is thrown and the user should be
+ allowed to perform the action, then the Forbidden exception should be
+ raised.
+ """
decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
mock_function = mock.Mock()
mock_function.side_effect = exceptions.Forbidden
wrapper = decorator(mock_function)
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = True
+ mock_auth.return_value = mock_permission
+
e = self.assertRaises(exceptions.Forbidden, wrapper, self.mock_args)
self.assertIn(
"Role Member was not allowed to perform sentinel.action.",
@@ -67,8 +114,100 @@
" perform sentinel.action.")
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_rule_validation_rbac_action_failed_positive(self, mock_auth,
+ mock_log):
+ """Test RbacActionFailed error is thrown without permission passes.
+
+ Positive test case: if RbacActionFailed is thrown and the user is not
+ allowed to perform the action, then this is a success.
+ """
+ decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
+ mock_function = mock.Mock()
+ mock_function.side_effect = rbac_exceptions.RbacActionFailed
+ wrapper = decorator(mock_function)
+
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = False
+ mock_auth.return_value = mock_permission
+
+ result = wrapper(self.mock_args)
+
+ self.assertIsNone(result)
+ mock_log.error.assert_not_called()
+ mock_log.warning.assert_not_called()
+
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_rule_validation_rbac_action_failed_negative(self, mock_auth,
+ mock_log):
+ """Test RbacActionFailed error is thrown with permission fails.
+
+ Negative test case: if RbacActionFailed is thrown and the user is
+ allowed to perform the action, then this is an expected failure.
+ """
+ decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
+ mock_function = mock.Mock()
+ mock_function.side_effect = rbac_exceptions.RbacActionFailed
+ wrapper = decorator(mock_function)
+
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = True
+ mock_auth.return_value = mock_permission
+
+ e = self.assertRaises(exceptions.Forbidden, wrapper, self.mock_args)
+ self.assertIn(
+ "Role Member was not allowed to perform sentinel.action.",
+ e.__str__())
+
+ mock_log.error.assert_called_once_with("Role Member was not allowed to"
+ " perform sentinel.action.")
+
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
def test_expect_not_found_but_raises_forbidden(self, mock_auth, mock_log):
+ """Test that expecting 404 but getting 403 works for all scenarios.
+
+ Tests the following scenarios:
+ 1) Test no permission and 404 is expected but 403 is thrown throws
+ exception.
+ 2) Test have permission and 404 is expected but 403 is thrown throws
+ exception.
+ """
+ decorator = rbac_rv.action(mock.sentinel.service,
+ mock.sentinel.action,
+ expected_error_code=404)
+ mock_function = mock.Mock()
+ mock_function.side_effect = exceptions.Forbidden('Random message.')
+ wrapper = decorator(mock_function)
+
+ expected_error = "Forbidden\nDetails: Random message. An unexpected "\
+ "exception has occurred: Expected exception was "\
+ "NotFound, which was not thrown."
+
+ for permission in [True, False]:
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = permission
+ mock_auth.return_value = mock_permission
+
+ e = self.assertRaises(exceptions.Forbidden, wrapper,
+ self.mock_args)
+ self.assertIn(expected_error, e.__str__())
+ mock_log.error.assert_called_once_with(expected_error)
+ mock_log.error.reset_mock()
+
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_expect_not_found_and_raise_not_found(self, mock_auth, mock_log):
+ """Test that expecting 404 and getting 404 works for all scenarios.
+
+ Tests the following scenarios:
+ 1) Test no permission and 404 is expected and 404 is thrown succeeds.
+ 2) Test have permission and 404 is expected and 404 is thrown fails.
+
+ In both cases, a LOG.warning is called with the "irregular message"
+ that signals to user that a 404 was expected and caught.
+ """
decorator = rbac_rv.action(mock.sentinel.service,
mock.sentinel.action,
expected_error_code=404)
@@ -76,32 +215,44 @@
mock_function.side_effect = exceptions.NotFound
wrapper = decorator(mock_function)
- e = self.assertRaises(exceptions.Forbidden, wrapper, self.mock_args)
- self.assertIn(
- "Role Member was not allowed to perform sentinel.action.",
- e.__str__())
- mock_log.error.assert_called_once_with("Role Member was not allowed to"
- " perform sentinel.action.")
+ expected_errors = [
+ "Role Member was not allowed to perform sentinel.action.", None
+ ]
+
+ for pos, permission in enumerate([True, False]):
+ mock_permission = mock.Mock()
+ mock_permission.get_permission.return_value = permission
+ mock_auth.return_value = mock_permission
+
+ expected_error = expected_errors[pos]
+
+ if expected_error:
+ e = self.assertRaises(exceptions.Forbidden, wrapper,
+ self.mock_args)
+ self.assertIn(expected_error, e.__str__())
+ mock_log.error.assert_called_once_with(expected_error)
+ else:
+ wrapper(self.mock_args)
+ mock_log.error.assert_not_called()
+
+ mock_log.warning.assert_called_once_with(
+ "NotFound exception was caught for policy action {0}. The "
+ "service {1} throws a 404 instead of a 403, which is "
+ "irregular.".format(mock.sentinel.action,
+ mock.sentinel.service))
+
+ mock_log.warning.reset_mock()
+ mock_log.error.reset_mock()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_RBAC_rv_rbac_action_failed(self, mock_auth, mock_log):
- decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
- mock_function = mock.Mock()
- mock_function.side_effect = rbac_exceptions.RbacActionFailed
- wrapper = decorator(mock_function)
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_rule_validation_overpermission_negative(self, mock_auth,
+ mock_log):
+ """Test that OverPermission is correctly handled.
- e = self.assertRaises(exceptions.Forbidden, wrapper, self.mock_args)
- self.assertIn(
- "Role Member was not allowed to perform sentinel.action.",
- e.__str__())
-
- mock_log.error.assert_called_once_with("Role Member was not allowed to"
- " perform sentinel.action.")
-
- @mock.patch.object(rbac_rv, 'LOG', autospec=True)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_RBAC_rv_not_allowed(self, mock_auth, mock_log):
+ Tests that case where no exception is thrown but the Patrole framework
+ says that the role should not be allowed to perform the policy action.
+ """
decorator = rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
mock_function = mock.Mock()
@@ -119,60 +270,10 @@
mock_log.error.assert_called_once_with(
"Role Member was allowed to perform sentinel.action")
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_RBAC_rv_forbidden_not_allowed(self, mock_auth):
- decorator = rbac_rv.action("", "")
-
- mock_function = mock.Mock()
- mock_function.side_effect = exceptions.Forbidden
- wrapper = decorator(mock_function)
-
- mock_permission = mock.Mock()
- mock_permission.get_permission.return_value = False
- mock_auth.return_value = mock_permission
-
- self.assertIsNone(wrapper(self.mock_args))
-
- @mock.patch.object(rbac_rv, 'LOG', autospec=True)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_expect_not_found_and_not_allowed(self, mock_auth, mock_log):
- decorator = rbac_rv.action(mock.sentinel.service,
- mock.sentinel.action,
- expected_error_code=404)
-
- mock_function = mock.Mock()
- mock_function.side_effect = exceptions.NotFound
- wrapper = decorator(mock_function)
-
- mock_permission = mock.Mock()
- mock_permission.get_permission.return_value = False
- mock_auth.return_value = mock_permission
-
- self.assertIsNone(wrapper(self.mock_args))
-
- mock_log.warning.assert_called_once_with(
- 'NotFound exception was caught for policy action sentinel.action. '
- 'The service sentinel.service throws a 404 instead of a 403, '
- 'which is irregular.')
- mock_log.error.assert_not_called()
-
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
- def test_RBAC_rv_rbac_action_failed_not_allowed(self, mock_auth):
- decorator = rbac_rv.action("", "")
-
- mock_function = mock.Mock()
- mock_function.side_effect = rbac_exceptions.RbacActionFailed
- wrapper = decorator(mock_function)
-
- mock_permission = mock.Mock()
- mock_permission.get_permission.return_value = False
- mock_auth.return_value = mock_permission
-
- self.assertIsNone(wrapper(self.mock_args))
-
@mock.patch.object(rbac_auth, 'rbac_policy_parser', autospec=True)
- def test_invalid_policy_rule_throws_skip_exception(
+ def test_invalid_policy_rule_throws_parsing_exception(
self, mock_rbac_policy_parser):
+ """Test that invalid policy action causes test to be skipped."""
mock_rbac_policy_parser.RbacPolicyParser.return_value.allowed.\
side_effect = rbac_exceptions.RbacParsingException
@@ -180,7 +281,7 @@
mock.sentinel.policy_rule)
wrapper = decorator(mock.Mock())
- e = self.assertRaises(testtools.TestCase.skipException, wrapper,
+ e = self.assertRaises(rbac_exceptions.RbacParsingException, wrapper,
self.mock_args)
self.assertEqual('Attempted to test an invalid policy file or action',
str(e))
@@ -189,8 +290,9 @@
mock.sentinel.tenant_id, mock.sentinel.user_id,
mock.sentinel.service)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
def test_get_exception_type_404(self, mock_auth):
+ """Test that getting a 404 exception type returns NotFound."""
expected_exception = exceptions.NotFound
expected_irregular_msg = ("NotFound exception was caught for policy "
"action {0}. The service {1} throws a 404 "
@@ -202,8 +304,9 @@
self.assertEqual(expected_exception, actual_exception)
self.assertEqual(expected_irregular_msg, actual_irregular_msg)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
def test_get_exception_type_403(self, mock_auth):
+ """Test that getting a 404 exception type returns Forbidden."""
expected_exception = exceptions.Forbidden
expected_irregular_msg = None
@@ -214,8 +317,9 @@
self.assertEqual(expected_irregular_msg, actual_irregular_msg)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
- @mock.patch('patrole_tempest_plugin.rbac_auth.RbacAuthority')
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
def test_exception_thrown_when_type_is_not_int(self, mock_auth, mock_log):
+ """Test that non-integer exception type raises error."""
self.assertRaises(rbac_exceptions.RbacInvalidErrorCode,
rbac_rv._get_exception_type, "403")
@@ -224,37 +328,16 @@
"codes: [403, 404]")
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
- def test_rbac_decorator_with_admin_only_and_have_permission(self,
- mock_log):
- CONF.set_override('rbac_test_role', 'admin', group='rbac',
- enforce_type=True)
- self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
+ @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
+ def test_exception_thrown_when_type_is_403_or_404(self, mock_auth,
+ mock_log):
+ """Test that unsupported exceptions throw error."""
+ invalid_exceptions = [200, 400, 500]
+ for exc in invalid_exceptions:
+ self.assertRaises(rbac_exceptions.RbacInvalidErrorCode,
+ rbac_rv._get_exception_type, exc)
+ mock_log.error.assert_called_once_with(
+ "Please pass an expected error code. Currently supported "
+ "codes: [403, 404]")
- decorator = rbac_rv.action(mock.sentinel.service,
- mock.sentinel.policy_rule,
- admin_only=True)
- wrapper = decorator(mock.Mock(side_effect=None))
- wrapper(self.mock_args)
-
- mock_log.info.assert_called_once_with(
- "As admin_only is True, only admin role should be allowed to "
- "perform the API. Skipping oslo.policy check for policy action "
- "{0}.".format(mock.sentinel.policy_rule))
-
- @mock.patch.object(rbac_rv, 'LOG', autospec=True)
- def test_rbac_decorator_with_admin_only_and_lack_permission(self,
- mock_log):
- CONF.set_override('rbac_test_role', 'Member', group='rbac',
- enforce_type=True)
- self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
-
- decorator = rbac_rv.action(mock.sentinel.service,
- mock.sentinel.policy_rule,
- admin_only=True)
- wrapper = decorator(mock.Mock(side_effect=exceptions.Forbidden))
- wrapper(self.mock_args)
-
- mock_log.info.assert_called_once_with(
- "As admin_only is True, only admin role should be allowed to "
- "perform the API. Skipping oslo.policy check for policy action "
- "{0}.".format(mock.sentinel.policy_rule))
+ mock_log.error.reset_mock()