Add secure-rbac tests for Containers
This patch adds basic RBAC testing for the Containers resource for
reader, member and admin personas with project scope.
Depends-On: I6879f566117db5ec0099ddad35ba649a3c674bd1
Change-Id: I2272f5cd2385df15cc7761e8b858dfd39be955d4
diff --git a/barbican_tempest_plugin/services/key_manager/json/base.py b/barbican_tempest_plugin/services/key_manager/json/base.py
new file mode 100644
index 0000000..0c21382
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/base.py
@@ -0,0 +1,22 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from tempest.lib.common import rest_client
+
+
+_DEFAULT_SERVICE_TYPE = 'key-manager'
+
+
+class BarbicanTempestClient(rest_client.RestClient):
+
+ def __init__(self, *args, **kwargs):
+ kwargs['service'] = _DEFAULT_SERVICE_TYPE
+ super().__init__(*args, **kwargs)
diff --git a/barbican_tempest_plugin/services/key_manager/json/container_client.py b/barbican_tempest_plugin/services/key_manager/json/container_client.py
index 7e24396..766cbf2 100644
--- a/barbican_tempest_plugin/services/key_manager/json/container_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/container_client.py
@@ -11,19 +11,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
CONF = config.CONF
-class ContainerClient(rest_client.RestClient):
+class ContainerClient(base.BarbicanTempestClient):
def list_containers(self, **kwargs):
uri = "v1/containers"
diff --git a/barbican_tempest_plugin/tests/rbac/v1/base.py b/barbican_tempest_plugin/tests/rbac/v1/base.py
index b5befe2..b16e431 100644
--- a/barbican_tempest_plugin/tests/rbac/v1/base.py
+++ b/barbican_tempest_plugin/tests/rbac/v1/base.py
@@ -30,10 +30,6 @@
RESOURCE_TYPES = ['container', 'order', 'quota', 'secret']
-def _get_uuid(href):
- return href.split('/')[-1]
-
-
def create_aes_key():
password = b"password"
salt = os.urandom(16)
@@ -50,9 +46,14 @@
_created_projects = None
_created_users = None
created_objects = {}
+
credentials = ['system_admin']
@classmethod
+ def ref_to_uuid(cls, href):
+ return href.split('/')[-1]
+
+ @classmethod
def skip_checks(cls):
super().skip_checks()
if not CONF.barbican_rbac_scope_verification.enforce_scope:
@@ -129,9 +130,7 @@
cls.consumer_client = os.secret_v1.ConsumerClient(
service='key-manager'
)
- cls.container_client = os.secret_v1.ContainerClient(
- service='key-manager'
- )
+ cls.container_client = os.secret_v1.ContainerClient()
cls.order_client = os.secret_v1.OrderClient(service='key-manager')
cls.quota_client = os.secret_v1.QuotaClient(service='key-manager')
cls.secret_client = os.secret_v1.SecretClient()
@@ -149,9 +148,7 @@
cls.admin_consumer_client = adm.secret_v1.ConsumerClient(
service='key-manager'
)
- cls.admin_container_client = adm.secret_v1.ContainerClient(
- service='key-manager'
- )
+ cls.admin_container_client = adm.secret_v1.ContainerClient()
cls.admin_order_client = adm.secret_v1.OrderClient(
service='key-manager'
)
@@ -192,18 +189,18 @@
@classmethod
def add_cleanup(cls, resource, response):
if resource == 'container':
- uuid = _get_uuid(response['container_ref'])
+ uuid = cls.ref_to_uuid(response['container_ref'])
if resource == 'order':
- uuid = _get_uuid(response.get('order_ref'))
+ uuid = cls.ref_to_uuid(response.get('order_ref'))
order_metadata = cls.get_order(uuid)
secret_ref = order_metadata.get('secret_ref')
if secret_ref:
- cls.created_objects['secret'].add(_get_uuid(secret_ref))
- uuid = _get_uuid(response['order_ref'])
+ cls.created_objects['secret'].add(cls.ref_to_uuid(secret_ref))
+ uuid = cls.ref_to_uuid(response['order_ref'])
if resource == 'quota':
- uuid = _get_uuid(response['quota_ref'])
+ uuid = cls.ref_to_uuid(response['quota_ref'])
if resource == 'secret':
- uuid = _get_uuid(response['secret_ref'])
+ uuid = cls.ref_to_uuid(response['secret_ref'])
cls.created_objects[resource].add(uuid)
@classmethod
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_containers.py b/barbican_tempest_plugin/tests/rbac/v1/test_containers.py
new file mode 100644
index 0000000..25b5b06
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_containers.py
@@ -0,0 +1,275 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import abc
+
+from tempest import config
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacContainers:
+
+ @abc.abstractmethod
+ def test_list_containers(self):
+ """Test list_containers policy
+
+ Testing: GET /v1/containers
+ This test must check:
+ * whether the persona can list containers
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_container(self):
+ """Test create_container policy
+
+ Testing: POST /v1/containers
+ Thist test must check:
+ * whether the persona can create a new container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_container(self):
+ """Test get_container policy
+
+ Testing: GET /v1/containers/{container-id}
+ Thist test must check:
+ * whether the persona can get a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_container(self):
+ """Test delete_container policy
+
+ Testing: DELETE /v1/containers/{container-id}
+ Thist test must check:
+ * whether the persona can delete a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_add_secret_to_container(self):
+ """Test add_secret_to_container policy
+
+ Testing: POST /v1/containers/{container-id}/secrets
+ Thist test must check:
+ * whether the persona can add a secret to a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret_from_container(self):
+ """Test delete_secret_from_container policy
+
+ Testing: DELETE /v1/containers/{container-id}/secrets
+ Thist test must check:
+ * whether the persona can delete a secret from a container
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.ContainerClient()
+ cls.secret_client = cls.os_project_member.secret_v1.SecretClient()
+
+ def test_list_containers(self):
+ self.do_request('create_container', cleanup='container',
+ name='list_containers', type='generic')
+
+ resp = self.do_request('list_containers')
+ containers = resp['containers']
+
+ self.assertGreaterEqual(len(containers), 1)
+
+ def test_create_container(self):
+ self.do_request('create_container', cleanup='container',
+ name='create_container', type='generic')
+
+ def test_get_container(self):
+ resp = self.do_request('create_container', cleanup='container',
+ name='get_container', type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request('get_container', container_id=container_id)
+
+ self.assertEqual(container_id, self.ref_to_uuid(resp['container_ref']))
+
+ def test_delete_container(self):
+ resp = self.do_request('create_container', name='delete_container',
+ type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request('delete_container', container_id=container_id)
+
+ def test_add_secret_to_container(self):
+ resp = self.do_request('create_container', cleanup='container',
+ name='add_secret_to_container_c',
+ type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.secret_client,
+ cleanup='secret',
+ name='add_secret_to_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ resp = self.do_request('add_secret_to_container',
+ container_id=container_id,
+ secret_id=secret_id)
+
+ def test_delete_secret_from_container(self):
+ resp = self.do_request('create_container', cleanup='container',
+ name='delete_secret_from_container_c',
+ type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.secret_client,
+ cleanup='secret',
+ name='delete_secret_from_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ self.do_request('add_secret_to_container',
+ container_id=container_id,
+ secret_id=secret_id)
+ resp = self.do_request('delete_secret_from_container',
+ container_id=container_id,
+ secret_id=secret_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.ContainerClient()
+ cls.secret_client = cls.os_project_admin.secret_v1.SecretClient()
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.ContainerClient()
+
+ def test_list_containers(self):
+ self.do_request('list_containers',
+ expected_status=exceptions.Forbidden)
+
+ def test_create_container(self):
+ self.do_request('create_container',
+ expected_status=exceptions.Forbidden,
+ name='create_container',
+ type='generic')
+
+ def test_get_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='create_container', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ self.do_request('get_container', expected_status=exceptions.Forbidden,
+ container_id=container_id)
+
+ def test_delete_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='delete_container', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ self.do_request('delete_container',
+ expected_status=exceptions.Forbidden,
+ container_id=container_id)
+
+ def test_add_secret_to_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='add_secret_to_container_c', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.os_project_member.secret_v1.SecretClient(),
+ cleanup='secret',
+ name='add_secret_to_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ self.do_request('add_secret_to_container',
+ expected_status=exceptions.Forbidden,
+ container_id=container_id,
+ secret_id=secret_id)
+
+ def test_delete_secret_from_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='delete_secret_from_container_c', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.os_project_member.secret_v1.SecretClient(),
+ cleanup='secret',
+ name='delete_secret_from_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ self.do_request(
+ 'add_secret_to_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ container_id=container_id,
+ secret_id=secret_id
+ )
+
+ self.do_request('delete_secret_from_container',
+ expected_status=exceptions.Forbidden,
+ container_id=container_id,
+ secret_id=secret_id)
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
index a5ffa8a..6b8f657 100644
--- a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
@@ -34,7 +34,7 @@
* whether the persona can create an empty secret
* whether the persona can create a secret with a symmetric key
"""
- pass
+ raise NotImplementedError
@abc.abstractmethod
def test_list_secrets(self):
@@ -44,7 +44,7 @@
This test must check:
* whether the persona can list secrets within their project
"""
- pass
+ raise NotImplementedError
@abc.abstractmethod
def test_delete_secret(self):
@@ -54,7 +54,7 @@
This test must check:
* whether the persona can delete a secret in their project
"""
- pass
+ raise NotImplementedError
@abc.abstractmethod
def test_get_secret(self):
@@ -64,7 +64,7 @@
This test must check:
* whether the persona can get a specific secret within their project
"""
- pass
+ raise NotImplementedError
@abc.abstractmethod
def test_get_secret_payload(self):
@@ -74,7 +74,7 @@
This test must check:
* whether the persona can get a secret payload
"""
- pass
+ raise NotImplementedError
@abc.abstractmethod
def test_put_secret_payload(self):
@@ -84,7 +84,7 @@
This test must check:
* whether the persona can add a paylod to an empty secret
"""
- pass
+ raise NotImplementedError
class ProjectMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
@@ -134,21 +134,21 @@
def test_delete_secret(self):
"""Test delete_secrets policy."""
sec = self.create_empty_secret_admin('test_delete_secret_1')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
self.do_request('delete_secret', secret_id=uuid)
self.delete_cleanup('secret', uuid)
def test_get_secret(self):
"""Test get_secret policy."""
sec = self.create_empty_secret_admin('test_get_secret')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
resp = self.do_request('get_secret_metadata', secret_id=uuid)
- self.assertEqual(uuid, rbac_base._get_uuid(resp['secret_ref']))
+ self.assertEqual(uuid, self.ref_to_uuid(resp['secret_ref']))
def test_get_secret_payload(self):
"""Test get_secret payload policy."""
key, sec = self.create_aes_secret_admin('test_get_secret_payload')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
# Retrieve the payload
payload = self.do_request('get_secret_payload', secret_id=uuid)
@@ -157,7 +157,7 @@
def test_put_secret_payload(self):
"""Test put_secret policy."""
sec = self.create_empty_secret_admin('test_put_secret_payload')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
key = rbac_base.create_aes_key()
@@ -226,7 +226,7 @@
def test_delete_secret(self):
"""Test delete_secrets policy."""
sec = self.create_empty_secret_admin('secret_1')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
self.do_request(
'delete_secret', expected_status=exceptions.Forbidden,
secret_id=uuid
@@ -235,7 +235,7 @@
def test_get_secret(self):
"""Test get_secret policy."""
sec = self.create_empty_secret_admin('secret_1')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
self.do_request(
'get_secret_metadata', expected_status=exceptions.Forbidden,
secret_id=uuid
@@ -244,7 +244,7 @@
def test_get_secret_payload(self):
"""Test get_secret payload policy."""
key, sec = self.create_aes_secret_admin('secret_1')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
# Retrieve the payload
self.do_request(
@@ -255,7 +255,7 @@
def test_put_secret_payload(self):
"""Test put_secret policy."""
sec = self.create_empty_secret_admin('secret_1')
- uuid = rbac_base._get_uuid(sec['secret_ref'])
+ uuid = self.ref_to_uuid(sec['secret_ref'])
key = rbac_base.create_aes_key()