Initial patch to add barbican rbac tests

This adds initial RBAC tests for secrets

Change-Id: Ib79eed6886839d1b7848c991bd64e82595c6c32e
diff --git a/barbican_tempest_plugin/config.py b/barbican_tempest_plugin/config.py
index 0c4a2ac..67a7986 100644
--- a/barbican_tempest_plugin/config.py
+++ b/barbican_tempest_plugin/config.py
@@ -54,3 +54,14 @@
                 help="Does the test environment enforce glance image "
                      "verification?"),
 ]
+
+barbican_rbac_scope_verification_group = cfg.OptGroup(
+    name="barbican_rbac_scope_verification",
+    title="Barbican RBAC Verification Options")
+
+BarbicanRBACScopeVerificationGroup = [
+    cfg.BoolOpt('enforce_scope',
+                default=False,
+                help="Does barbican enforce scope and user "
+                     "scope-aware policies?"),
+]
diff --git a/barbican_tempest_plugin/plugin.py b/barbican_tempest_plugin/plugin.py
index 1914ecb..2acd7cc 100644
--- a/barbican_tempest_plugin/plugin.py
+++ b/barbican_tempest_plugin/plugin.py
@@ -39,6 +39,12 @@
                            project_config.ephemeral_storage_encryption_group)
         conf.register_opts(project_config.ImageSignatureVerificationGroup,
                            project_config.image_signature_verification_group)
+        conf.register_group(
+            project_config.barbican_rbac_scope_verification_group)
+        conf.register_opts(
+            project_config.BarbicanRBACScopeVerificationGroup,
+            project_config.barbican_rbac_scope_verification_group
+        )
 
     def get_opt_lists(self):
         return [('service_available', [project_config.service_option])]
diff --git a/barbican_tempest_plugin/tests/api/base.py b/barbican_tempest_plugin/tests/api/base.py
index 7256a10..2599480 100644
--- a/barbican_tempest_plugin/tests/api/base.py
+++ b/barbican_tempest_plugin/tests/api/base.py
@@ -84,6 +84,11 @@
         cls.quota_client = os.secret_v1.QuotaClient(service='key-manager')
 
     @classmethod
+    def setup_credentials(cls):
+        super().setup_credentials()
+        cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}')
+
+    @classmethod
     def resource_setup(cls):
         super(BaseKeyManagerTest, cls).resource_setup()
         for resource in RESOURCE_TYPES:
diff --git a/barbican_tempest_plugin/tests/rbac/__init__.py b/barbican_tempest_plugin/tests/rbac/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/__init__.py
diff --git a/barbican_tempest_plugin/tests/rbac/v1/__init__.py b/barbican_tempest_plugin/tests/rbac/v1/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/__init__.py
diff --git a/barbican_tempest_plugin/tests/rbac/v1/base.py b/barbican_tempest_plugin/tests/rbac/v1/base.py
new file mode 100644
index 0000000..5cddb7e
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/base.py
@@ -0,0 +1,149 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import config
+
+CONF = config.CONF
+
+RESOURCE_TYPES = ['container', 'order', 'quota', 'secret']
+
+
+def _get_uuid(href):
+    return href.split('/')[-1]
+
+
+class BarbicanV1RbacBase(object):
+
+    identity_version = 'v3'
+    created_objects = {}
+
+    @classmethod
+    def skip_checks(cls):
+        super().skip_checks()
+        if not CONF.barbican_rbac_scope_verification.enforce_scope:
+            raise cls.skipException("enforce_scope is not enabled for "
+                                    "barbican, skipping RBAC tests")
+
+    @classmethod
+    def setup_clients(cls):
+        super().setup_clients()
+
+        # setup clients for primary persona
+        os = getattr(cls, f'os_{cls.credentials[0]}')
+        cls.secret_client = os.secret_v1.SecretClient(service='key-manager')
+        cls.secret_metadata_client = os.secret_v1.SecretMetadataClient(
+            service='key-manager'
+        )
+        cls.consumer_client = os.secret_v1.ConsumerClient(
+            service='key-manager'
+        )
+        cls.container_client = os.secret_v1.ContainerClient(
+            service='key-manager'
+        )
+        cls.order_client = os.secret_v1.OrderClient(service='key-manager')
+        cls.quota_client = os.secret_v1.QuotaClient(service='key-manager')
+        cls.secret_client = os.secret_v1.SecretClient(service='key-manager')
+        cls.secret_metadata_client = os.secret_v1.SecretMetadataClient(
+            service='key-manager'
+        )
+
+        # setup clients for admin persona
+        # this client is used for any cleanupi/setup etc. as needed
+        adm = getattr(cls, f'os_{cls.credentials[1]}')
+        cls.admin_secret_client = adm.secret_v1.SecretClient(
+            service='key-manager')
+        cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient(
+            service='key-manager'
+        )
+        cls.admin_consumer_client = adm.secret_v1.ConsumerClient(
+            service='key-manager'
+        )
+        cls.admin_container_client = adm.secret_v1.ContainerClient(
+            service='key-manager'
+        )
+        cls.admin_order_client = adm.secret_v1.OrderClient(
+            service='key-manager'
+        )
+        cls.admin_quota_client = adm.secret_v1.QuotaClient(
+            service='key-manager'
+        )
+        cls.admin_secret_client = adm.secret_v1.SecretClient(
+            service='key-manager'
+        )
+        cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient(
+            service='key-manager'
+        )
+
+    @classmethod
+    def setup_credentials(cls):
+        super().setup_credentials()
+        cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}')
+
+    @classmethod
+    def resource_setup(cls):
+        super().resource_setup()
+        for resource in RESOURCE_TYPES:
+            cls.created_objects[resource] = set()
+
+    @classmethod
+    def resource_cleanup(cls):
+        try:
+            for container_uuid in list(cls.created_objects['container']):
+                cls.admin_container_client.delete_container(container_uuid)
+                cls.created_objects['container'].remove(container_uuid)
+            for order_uuid in list(cls.created_objects['order']):
+                cls.admin_order_client.delete_order(order_uuid)
+                cls.created_objects['order'].remove(order_uuid)
+            for quota_uuid in list(cls.created_objects['quota']):
+                cls.admin_quota_client.delete_project_quota(quota_uuid)
+                cls.created_objects['quota'].remove(quota_uuid)
+            for secret_uuid in list(cls.created_objects['secret']):
+                cls.admin_secret_client.delete_secret(secret_uuid)
+                cls.created_objects['secret'].remove(secret_uuid)
+        finally:
+            super(BarbicanV1RbacBase, cls).resource_cleanup()
+
+    @classmethod
+    def add_cleanup(cls, resource, response):
+        if resource == 'container':
+            uuid = _get_uuid(response['container_ref'])
+        if resource == 'order':
+            uuid = _get_uuid(response.get('order_ref'))
+            order_metadata = cls.get_order(uuid)
+            secret_ref = order_metadata.get('secret_ref')
+            if secret_ref:
+                cls.created_objects['secret'].add(_get_uuid(secret_ref))
+            uuid = _get_uuid(response['order_ref'])
+        if resource == 'quota':
+            uuid = _get_uuid(response['quota_ref'])
+        if resource == 'secret':
+            uuid = _get_uuid(response['secret_ref'])
+        cls.created_objects[resource].add(uuid)
+
+    @classmethod
+    def delete_cleanup(cls, resource, uuid):
+        cls.created_objects[resource].remove(uuid)
+
+    def do_request(self, method, client=None, expected_status=200,
+                   cleanup=None, **args):
+        if client is None:
+            client = self.client
+        if isinstance(expected_status, type(Exception)):
+            self.assertRaises(expected_status,
+                              getattr(client, method),
+                              **args)
+        else:
+            response = getattr(client, method)(**args)
+            self.assertEqual(response.response.status, expected_status)
+            if cleanup is not None:
+                self.add_cleanup(cleanup, response)
+            return response
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
new file mode 100644
index 0000000..3d6c6e3
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
@@ -0,0 +1,359 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import base64
+from datetime import datetime
+from datetime import timedelta
+import os
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+
+from tempest import config
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base as rbac_base
+
+CONF = config.CONF
+
+
+def create_aes_key():
+    password = b"password"
+    salt = os.urandom(16)
+    kdf = PBKDF2HMAC(
+        algorithm=hashes.SHA256(), length=32, salt=salt,
+        iterations=1000, backend=default_backend()
+    )
+    return base64.b64encode(kdf.derive(password))
+
+
+class BarbicanV1RbacSecretsBase(rbac_base.BarbicanV1RbacBase,
+                                metaclass=abc.ABCMeta):
+
+    @classmethod
+    def setup_clients(cls):
+        super().setup_clients()
+        cls.client = cls.secret_client
+
+    def create_empty_secret_admin(self, secret_name):
+        """add empty secret as admin user """
+        return self.do_request(
+            'create_secret', client=self.admin_secret_client,
+            expected_status=201, cleanup='secret', name=secret_name)
+
+    def create_aes_secret_admin(self, secret_name):
+        key = create_aes_key()
+        expire_time = (datetime.utcnow() + timedelta(days=5))
+        return key, self.do_request(
+            'create_secret', client=self.admin_secret_client,
+            expected_status=201, cleanup="secret",
+            expiration=expire_time.isoformat(), algorithm="aes",
+            bit_length=256, mode="cbc", payload=key,
+            payload_content_type="application/octet-stream",
+            payload_content_encoding="base64",
+            name=secret_name
+        )
+
+    @abc.abstractmethod
+    def test_create_secret(self):
+        """Test add_secret policy.
+
+        Testing: POST /v1/secrets
+        This test must check:
+          * whether the persona can create an empty secret
+          * whether the persona can create a secret with a symmetric key
+        """
+        pass
+
+    @abc.abstractmethod
+    def test_list_secrets(self):
+        """Test get_secrets policy.
+
+        Testing: GET /v1/secrets
+        This test must check:
+          * whether the persona can list secrets within their project
+        """
+        pass
+
+    @abc.abstractmethod
+    def test_delete_secret(self):
+        """Test deleting a secret.
+
+        Testing: DEL /v1/secrets/{secret_id}
+        This test must check:
+          * whether the persona can delete a secret in their project
+        """
+        pass
+
+    @abc.abstractmethod
+    def test_get_secret(self):
+        """Test get_secret policy.
+
+        Testing: GET /v1/secrets/{secret_id}
+        This test must check:
+          * whether the persona can get a specific secret within their project
+        """
+        pass
+
+    @abc.abstractmethod
+    def test_get_secret_payload(self):
+        """Test get_secret payload policy.
+
+        Testing: GET /v1/secrets/{secret_id}/payload
+        This test must check:
+          * whether the persona can get a secret payload
+        """
+        pass
+
+    @abc.abstractmethod
+    def test_put_secret_payload(self):
+        """Test put_secret policy.
+
+        Testing: PUT /v1/secrets/{secret_id}
+        This test must check:
+          * whether the persona can add a paylod to an empty secret
+        """
+        pass
+
+
+class ProjectMemberTests(BarbicanV1RbacSecretsBase):
+    credentials = ['project_member', 'project_admin']
+
+    def test_create_secret(self):
+        """Test add_secret policy."""
+        self.do_request('create_secret', expected_status=201, cleanup='secret')
+
+        key = create_aes_key()
+        expire_time = (datetime.utcnow() + timedelta(days=5))
+        self.do_request(
+            'create_secret', expected_status=201, cleanup="secret",
+            expiration=expire_time.isoformat(), algorithm="aes",
+            bit_length=256, mode="cbc", payload=key,
+            payload_content_type="application/octet-stream",
+            payload_content_encoding="base64"
+        )
+
+    def test_list_secrets(self):
+        """Test get_secrets policy."""
+        # create two secrets
+        self.create_empty_secret_admin('secret_1')
+        self.create_empty_secret_admin('secret_2')
+
+        # list secrets with name secret_1
+        resp = self.do_request('list_secrets', name='secret_1')
+        secrets = resp['secrets']
+        self.assertEqual('secret_1', secrets[0]['name'])
+
+        # list secrets with name secret_2
+        resp = self.do_request('list_secrets', name='secret_2')
+        secrets = resp['secrets']
+        self.assertEqual('secret_2', secrets[0]['name'])
+
+        # list all secrets
+        resp = self.do_request('list_secrets')
+        secrets = resp['secrets']
+        self.assertEqual(len(secrets), 2)
+
+    def test_delete_secret(self):
+        """Test delete_secrets policy."""
+        sec = self.create_empty_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+        self.do_request('delete_secret', secret_id=uuid)
+        self.delete_cleanup('secret', uuid)
+
+    def test_get_secret(self):
+        """Test get_secret policy."""
+        sec = self.create_empty_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+        resp = self.do_request('get_secret_metadata', secret_id=uuid)
+        self.assertEqual(uuid, rbac_base._get_uuid(resp['secret_ref']))
+
+    def test_get_secret_payload(self):
+        """Test get_secret payload policy."""
+        key, sec = self.create_aes_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+
+        # Retrieve the payload
+        payload = self.do_request('get_secret_payload', secret_id=uuid)
+        self.assertEqual(key, base64.b64encode(payload))
+
+    def test_put_secret_payload(self):
+        """Test put_secret policy."""
+        sec = self.create_empty_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+
+        key = create_aes_key()
+
+        # Associate the payload with the created secret
+        self.do_request('put_secret_payload', secret_id=uuid, payload=key)
+
+        # Retrieve the payload
+        payload = self.do_request('get_secret_payload', secret_id=uuid)
+        self.assertEqual(key, base64.b64encode(payload))
+
+
+class ProjectAdminTests(ProjectMemberTests):
+    credentials = ['project_admin', 'project_admin']
+
+
+class ProjectReaderTests(BarbicanV1RbacSecretsBase):
+    credentials = ['project_reader', 'project_admin']
+
+    def test_create_secret(self):
+        """Test add_secret policy."""
+        self.do_request(
+            'create_secret', expected_status=exceptions.Forbidden,
+            cleanup='secret')
+
+        key = create_aes_key()
+        expire_time = (datetime.utcnow() + timedelta(days=5))
+        self.do_request(
+            'create_secret', expected_status=exceptions.Forbidden,
+            cleanup="secret",
+            expiration=expire_time.isoformat(), algorithm="aes",
+            bit_length=256, mode="cbc", payload=key,
+            payload_content_type="application/octet-stream",
+            payload_content_encoding="base64"
+        )
+
+    def test_list_secrets(self):
+        """Test get_secrets policy."""
+        # create two secrets
+        self.create_empty_secret_admin('secret_1')
+        self.create_empty_secret_admin('secret_2')
+
+        # list secrets with name secret_1
+        self.do_request(
+            'list_secrets', expected_status=exceptions.Forbidden,
+            name='secret_1'
+        )
+
+        # list secrets with name secret_2
+        self.do_request(
+            'list_secrets', expected_status=exceptions.Forbidden,
+            name='secret_2'
+        )
+
+        # list all secrets
+        self.do_request(
+            'list_secrets', expected_status=exceptions.Forbidden
+        )
+
+    def test_delete_secret(self):
+        """Test delete_secrets policy."""
+        sec = self.create_empty_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+        self.do_request(
+            'delete_secret', expected_status=exceptions.Forbidden,
+            secret_id=uuid
+        )
+
+    def test_get_secret(self):
+        """Test get_secret policy."""
+        sec = self.create_empty_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+        self.do_request(
+            'get_secret_metadata', expected_status=exceptions.Forbidden,
+            secret_id=uuid
+        )
+
+    def test_get_secret_payload(self):
+        """Test get_secret payload policy."""
+        key, sec = self.create_aes_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+
+        # Retrieve the payload
+        self.do_request(
+            'get_secret_payload', expected_status=exceptions.Forbidden,
+            secret_id=uuid
+        )
+
+    def test_put_secret_payload(self):
+        """Test put_secret policy."""
+        sec = self.create_empty_secret_admin('secret_1')
+        uuid = rbac_base._get_uuid(sec['secret_ref'])
+
+        key = create_aes_key()
+
+        # Associate the payload with the created secret
+        self.do_request(
+            'put_secret_payload', expected_status=exceptions.Forbidden,
+            secret_id=uuid, payload=key
+        )
+
+
+class SystemAdminTests(BarbicanV1RbacSecretsBase):
+    credentials = ['system_admin', 'project_admin']
+
+    def test_create_secret(self):
+        pass
+
+    def test_list_secrets(self):
+        pass
+
+    def test_delete_secret(self):
+        pass
+
+    def test_get_secret(self):
+        pass
+
+    def test_get_secret_payload(self):
+        pass
+
+    def test_put_secret_payload(self):
+        pass
+
+
+class SystemMemberTests(BarbicanV1RbacSecretsBase):
+    credentials = ['system_member', 'project_admin']
+
+    def test_create_secret(self):
+        pass
+
+    def test_list_secrets(self):
+        pass
+
+    def test_delete_secret(self):
+        pass
+
+    def test_get_secret(self):
+        pass
+
+    def test_get_secret_payload(self):
+        pass
+
+    def test_put_secret_payload(self):
+        pass
+
+
+class SystemReaderTests(BarbicanV1RbacSecretsBase):
+    credentials = ['system_reader', 'project_admin']
+
+    def test_create_secret(self):
+        pass
+
+    def test_list_secrets(self):
+        pass
+
+    def test_delete_secret(self):
+        pass
+
+    def test_get_secret(self):
+        pass
+
+    def test_get_secret_payload(self):
+        pass
+
+    def test_put_secret_payload(self):
+        pass