Merge "Remove old pep8 ignores that are no longer necessary"
diff --git a/.zuul.yaml b/.zuul.yaml
index 1b3cb87..753b57b 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -5,6 +5,8 @@
check:
jobs:
- barbican-tempest-plugin-simple-crypto
+ - barbican-tempest-plugin-simple-crypto-secure-rbac
+ - barbican-tempest-plugin-simple-crypto-wallaby
- barbican-tempest-plugin-simple-crypto-victoria
- barbican-tempest-plugin-simple-crypto-ussuri
- barbican-tempest-plugin-simple-crypto-train
@@ -53,6 +55,34 @@
- barbican-tempest-plugin
- job:
+ name: barbican-tempest-plugin-simple-crypto-secure-rbac
+ parent: barbican-tempest-plugin-simple-crypto
+ vars:
+ devstack_local_conf:
+ post-config:
+ $BARBICAN_CONF:
+ oslo_policy:
+ enforce_new_defaults: True
+ test-config:
+ $TEMPEST_CONFIG:
+ # FIXME(redrobot): Tempest errors out when you try to create a
+ # system-scope admin because of a neutron client issue where a
+ # tenant_id is required.
+ # To work around that issue we disable create_isolate_networks
+ # here, and we also skip a lot of tests that require that feature.
+ # We should be able to re-enable this once Tempest is fixed.
+ # See: https://review.opendev.org/c/openstack/tempest/+/781553
+ auth:
+ create_isolated_networks: False
+ barbican_rbac_scope_verification:
+ enforce_scope: True
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-wallaby
+ parent: barbican-tempest-plugin-simple-crypto
+ override-checkout: stable/wallaby
+
+- job:
name: barbican-tempest-plugin-simple-crypto-victoria
parent: barbican-tempest-plugin-simple-crypto
nodeset: openstack-single-node-bionic
diff --git a/barbican_tempest_plugin/config.py b/barbican_tempest_plugin/config.py
index 0c4a2ac..7b79cb5 100644
--- a/barbican_tempest_plugin/config.py
+++ b/barbican_tempest_plugin/config.py
@@ -20,6 +20,17 @@
help="Whether or not barbican is expected to be "
"available")
+barbican_tempest_group = cfg.OptGroup(
+ name='barbican_tempest',
+ title='Key Manager (Barbican) service options'
+)
+
+BarbicanGroupOpts = [
+ cfg.BoolOpt('enable_multiple_secret_stores',
+ default=False,
+ help="Flag to enable mulitple secret store tests")
+]
+
ephemeral_storage_encryption_group = cfg.OptGroup(
name="ephemeral_storage_encryption",
title="Ephemeral storage encryption options")
@@ -54,3 +65,14 @@
help="Does the test environment enforce glance image "
"verification?"),
]
+
+barbican_rbac_scope_verification_group = cfg.OptGroup(
+ name="barbican_rbac_scope_verification",
+ title="Barbican RBAC Verification Options")
+
+BarbicanRBACScopeVerificationGroup = [
+ cfg.BoolOpt('enforce_scope',
+ default=False,
+ help="Does barbican enforce scope and user "
+ "scope-aware policies?"),
+]
diff --git a/barbican_tempest_plugin/plugin.py b/barbican_tempest_plugin/plugin.py
index 1914ecb..b829a05 100644
--- a/barbican_tempest_plugin/plugin.py
+++ b/barbican_tempest_plugin/plugin.py
@@ -33,12 +33,22 @@
conf.register_opt(project_config.service_option,
group='service_available')
+ conf.register_group(project_config.barbican_tempest_group)
+ conf.register_opts(project_config.BarbicanGroupOpts,
+ project_config.barbican_tempest_group)
+
# Register ephemeral storage encryption options
conf.register_group(project_config.ephemeral_storage_encryption_group)
conf.register_opts(project_config.EphemeralStorageEncryptionGroup,
project_config.ephemeral_storage_encryption_group)
conf.register_opts(project_config.ImageSignatureVerificationGroup,
project_config.image_signature_verification_group)
+ conf.register_group(
+ project_config.barbican_rbac_scope_verification_group)
+ conf.register_opts(
+ project_config.BarbicanRBACScopeVerificationGroup,
+ project_config.barbican_rbac_scope_verification_group
+ )
def get_opt_lists(self):
return [('service_available', [project_config.service_option])]
@@ -54,7 +64,9 @@
'OrderClient',
'QuotaClient',
'SecretClient',
- 'SecretMetadataClient'
+ 'SecretMetadataClient',
+ 'SecretStoresClient',
+ 'TransportKeyClient'
],
}
return [v1_params]
diff --git a/barbican_tempest_plugin/services/key_manager/json/__init__.py b/barbican_tempest_plugin/services/key_manager/json/__init__.py
index 7bce46a..ebab977 100644
--- a/barbican_tempest_plugin/services/key_manager/json/__init__.py
+++ b/barbican_tempest_plugin/services/key_manager/json/__init__.py
@@ -24,6 +24,10 @@
import SecretClient
from barbican_tempest_plugin.services.key_manager.json.secret_metadata_client \
import SecretMetadataClient
+from barbican_tempest_plugin.services.key_manager.json.secret_stores_client \
+ import SecretStoresClient
+from barbican_tempest_plugin.services.key_manager.json.transport_key_client \
+ import TransportKeyClient
__all__ = [
'ConsumerClient',
@@ -31,5 +35,7 @@
'OrderClient',
'QuotaClient',
'SecretClient',
- 'SecretMetadataClient'
+ 'SecretMetadataClient',
+ 'SecretStoresClient',
+ 'TransportKeyClient'
]
diff --git a/barbican_tempest_plugin/services/key_manager/json/base.py b/barbican_tempest_plugin/services/key_manager/json/base.py
new file mode 100644
index 0000000..0c21382
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/base.py
@@ -0,0 +1,22 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from tempest.lib.common import rest_client
+
+
+_DEFAULT_SERVICE_TYPE = 'key-manager'
+
+
+class BarbicanTempestClient(rest_client.RestClient):
+
+ def __init__(self, *args, **kwargs):
+ kwargs['service'] = _DEFAULT_SERVICE_TYPE
+ super().__init__(*args, **kwargs)
diff --git a/barbican_tempest_plugin/services/key_manager/json/container_client.py b/barbican_tempest_plugin/services/key_manager/json/container_client.py
index 7e24396..766cbf2 100644
--- a/barbican_tempest_plugin/services/key_manager/json/container_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/container_client.py
@@ -11,19 +11,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
CONF = config.CONF
-class ContainerClient(rest_client.RestClient):
+class ContainerClient(base.BarbicanTempestClient):
def list_containers(self, **kwargs):
uri = "v1/containers"
diff --git a/barbican_tempest_plugin/services/key_manager/json/order_client.py b/barbican_tempest_plugin/services/key_manager/json/order_client.py
index d7a3945..c81ddd0 100644
--- a/barbican_tempest_plugin/services/key_manager/json/order_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/order_client.py
@@ -11,19 +11,18 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
CONF = config.CONF
-class OrderClient(rest_client.RestClient):
+class OrderClient(base.BarbicanTempestClient):
def list_orders(self, **kwargs):
uri = "/v1/orders"
diff --git a/barbican_tempest_plugin/services/key_manager/json/quota_client.py b/barbican_tempest_plugin/services/key_manager/json/quota_client.py
index ba09b56..a238932 100644
--- a/barbican_tempest_plugin/services/key_manager/json/quota_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/quota_client.py
@@ -14,16 +14,17 @@
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
CONF = config.CONF
-class QuotaClient(rest_client.RestClient):
+class QuotaClient(base.BarbicanTempestClient):
def list_quotas(self, **kwargs):
uri = "v1/project-quotas"
diff --git a/barbican_tempest_plugin/services/key_manager/json/secret_client.py b/barbican_tempest_plugin/services/key_manager/json/secret_client.py
index 8c1fa16..29a9cd0 100644
--- a/barbican_tempest_plugin/services/key_manager/json/secret_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/secret_client.py
@@ -17,13 +17,16 @@
import json
from tempest import config
-from tempest.lib.common import rest_client
from tempest.lib.common.utils import data_utils
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
CONF = config.CONF
-class SecretClient(rest_client.RestClient):
+class SecretClient(base.BarbicanTempestClient):
+
def create_secret(self, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = data_utils.rand_name("tempest-sec")
diff --git a/barbican_tempest_plugin/services/key_manager/json/secret_stores_client.py b/barbican_tempest_plugin/services/key_manager/json/secret_stores_client.py
new file mode 100644
index 0000000..cb5fd5e
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/secret_stores_client.py
@@ -0,0 +1,60 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from urllib import parse
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
+class SecretStoresClient(base.BarbicanTempestClient):
+
+ def list_secret_stores(self, **kwargs):
+ uri = '/v1/secret-stores'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_secret_store(self, secret_store_id):
+ uri = '/v1/secret-stores/{}'.format(secret_store_id)
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_global_secret_store(self, **kwargs):
+ uri = '/v1/secret-stores/global-default'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_preferred_secret_store(self, **kwargs):
+ uri = '/v1/secret-stores/preferred'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def set_preferred_secret_store(self, secret_store_id):
+ uri = '/v1/secret-stores/{}/preferred'.format(secret_store_id)
+ resp, body = self.post(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def unset_preferred_secret_store(self, secret_store_id):
+ uri = '/v1/secret-stores/{}/preferred'.format(secret_store_id)
+ resp, body = self.delete(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
diff --git a/barbican_tempest_plugin/services/key_manager/json/transport_key_client.py b/barbican_tempest_plugin/services/key_manager/json/transport_key_client.py
new file mode 100644
index 0000000..99fa2ea
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/transport_key_client.py
@@ -0,0 +1,44 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from urllib import parse
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
+class TransportKeyClient(base.BarbicanTempestClient):
+
+ def list_transport_keys(self, **kwargs):
+ uri = '/v1/transport_keys'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def create_transport_key(self, **kwargs):
+ uri = '/v1/transport_keys'
+ post_body = json.dumps(kwargs)
+ resp, body = self.post(uri, post_body)
+ self.expected_success(201, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_transport_key(self, transport_key_id):
+ uri = '/v1/transport_keys/{}'.format(transport_key_id)
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def delete_transport_key(self, transport_key_id):
+ uri = '/v1/transport_keys/{}'.format(transport_key_id)
+ resp, body = self.delete(uri)
+ self.expected_success(204, resp.status)
diff --git a/barbican_tempest_plugin/tests/api/base.py b/barbican_tempest_plugin/tests/api/base.py
index 7256a10..2599480 100644
--- a/barbican_tempest_plugin/tests/api/base.py
+++ b/barbican_tempest_plugin/tests/api/base.py
@@ -84,6 +84,11 @@
cls.quota_client = os.secret_v1.QuotaClient(service='key-manager')
@classmethod
+ def setup_credentials(cls):
+ super().setup_credentials()
+ cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}')
+
+ @classmethod
def resource_setup(cls):
super(BaseKeyManagerTest, cls).resource_setup()
for resource in RESOURCE_TYPES:
diff --git a/barbican_tempest_plugin/tests/rbac/__init__.py b/barbican_tempest_plugin/tests/rbac/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/__init__.py
diff --git a/barbican_tempest_plugin/tests/rbac/v1/__init__.py b/barbican_tempest_plugin/tests/rbac/v1/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/__init__.py
diff --git a/barbican_tempest_plugin/tests/rbac/v1/base.py b/barbican_tempest_plugin/tests/rbac/v1/base.py
new file mode 100644
index 0000000..94888af
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/base.py
@@ -0,0 +1,238 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import base64
+from datetime import datetime
+from datetime import timedelta
+import os
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+
+from tempest import clients
+from tempest import config
+from tempest.lib import auth
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+CONF = config.CONF
+
+RESOURCE_TYPES = ['container', 'order', 'quota', 'secret']
+
+
+def create_aes_key():
+ password = b"password"
+ salt = os.urandom(16)
+ kdf = PBKDF2HMAC(
+ algorithm=hashes.SHA256(), length=32, salt=salt,
+ iterations=1000, backend=default_backend()
+ )
+ return base64.b64encode(kdf.derive(password))
+
+
+class BarbicanV1RbacBase(test.BaseTestCase):
+
+ identity_version = 'v3'
+ _created_projects = None
+ _created_users = None
+ created_objects = {}
+
+ credentials = ['system_admin']
+
+ @classmethod
+ def ref_to_uuid(cls, href):
+ return href.split('/')[-1]
+
+ @classmethod
+ def skip_checks(cls):
+ super().skip_checks()
+ if not CONF.barbican_rbac_scope_verification.enforce_scope:
+ raise cls.skipException("enforce_scope is not enabled for "
+ "barbican, skipping RBAC tests")
+
+ @classmethod
+ def setup_credentials(cls):
+ super().setup_credentials()
+ cls._created_projects = list()
+ cls._created_users = list()
+ project_id = cls.os_system_admin.projects_client.create_project(
+ data_utils.rand_name()
+ )['project']['id']
+ cls._created_projects.append(project_id)
+ setattr(cls, 'os_project_admin',
+ cls._setup_new_user_client(project_id, 'admin'))
+ setattr(cls, 'os_project_member',
+ cls._setup_new_user_client(project_id, 'member'))
+ setattr(cls, 'os_project_reader',
+ cls._setup_new_user_client(project_id, 'reader'))
+
+ @classmethod
+ def _setup_new_user_client(cls, project_id, role):
+ """Create a new tempest.clients.Manager
+
+ Creates a new user with the given roles on the given project,
+ and returns an instance of tempest.clients.Manager set up
+ for that user.
+
+ Users are cleaned up during class teardown in cls.clear_credentials
+ """
+ user = {
+ 'name': data_utils.rand_name('user'),
+ 'password': data_utils.rand_password()
+ }
+ user_id = cls.os_system_admin.users_v3_client.create_user(
+ **user
+ )['user']['id']
+ cls._created_users.append(user_id)
+ role_id = cls.os_system_admin.roles_v3_client.list_roles(
+ name=role
+ )['roles'][0]['id']
+ cls.os_system_admin.roles_v3_client.create_user_role_on_project(
+ project_id, user_id, role_id
+ )
+ creds = auth.KeystoneV3Credentials(
+ user_id=user_id,
+ password=user['password'],
+ project_id=project_id
+ )
+ auth_provider = clients.get_auth_provider(creds)
+ creds = auth_provider.fill_credentials()
+ return clients.Manager(credentials=creds)
+
+ @classmethod
+ def clear_credentials(cls):
+ for user_id in cls._created_users:
+ cls.os_system_admin.users_v3_client.delete_user(user_id)
+ for project_id in cls._created_projects:
+ cls.os_system_admin.projects_client.delete_project(project_id)
+ super().clear_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+
+ # setup clients for primary persona
+ os = cls.os_project_member
+ cls.secret_client = os.secret_v1.SecretClient()
+ cls.secret_metadata_client = os.secret_v1.SecretMetadataClient(
+ service='key-manager'
+ )
+ cls.consumer_client = os.secret_v1.ConsumerClient(
+ service='key-manager'
+ )
+ cls.container_client = os.secret_v1.ContainerClient()
+ cls.order_client = os.secret_v1.OrderClient()
+ cls.quota_client = os.secret_v1.QuotaClient()
+ cls.secret_client = os.secret_v1.SecretClient()
+ cls.secret_metadata_client = os.secret_v1.SecretMetadataClient(
+ service='key-manager'
+ )
+
+ # setup clients for admin persona
+ # this client is used for any cleanupi/setup etc. as needed
+ adm = cls.os_project_admin
+ cls.admin_secret_client = adm.secret_v1.SecretClient()
+ cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient(
+ service='key-manager'
+ )
+ cls.admin_consumer_client = adm.secret_v1.ConsumerClient(
+ service='key-manager'
+ )
+ cls.admin_container_client = adm.secret_v1.ContainerClient()
+ cls.admin_order_client = adm.secret_v1.OrderClient()
+ cls.admin_quota_client = adm.secret_v1.QuotaClient()
+ cls.admin_secret_client = adm.secret_v1.SecretClient(
+ service='key-manager'
+ )
+ cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient(
+ service='key-manager'
+ )
+
+ @classmethod
+ def resource_setup(cls):
+ super().resource_setup()
+ for resource in RESOURCE_TYPES:
+ cls.created_objects[resource] = set()
+
+ @classmethod
+ def resource_cleanup(cls):
+ try:
+ for container_uuid in list(cls.created_objects['container']):
+ cls.admin_container_client.delete_container(container_uuid)
+ cls.created_objects['container'].remove(container_uuid)
+ for order_uuid in list(cls.created_objects['order']):
+ cls.admin_order_client.delete_order(order_uuid)
+ cls.created_objects['order'].remove(order_uuid)
+ for quota_uuid in list(cls.created_objects['quota']):
+ cls.admin_quota_client.delete_project_quota(quota_uuid)
+ cls.created_objects['quota'].remove(quota_uuid)
+ for secret_uuid in list(cls.created_objects['secret']):
+ cls.admin_secret_client.delete_secret(secret_uuid)
+ cls.created_objects['secret'].remove(secret_uuid)
+ finally:
+ super(BarbicanV1RbacBase, cls).resource_cleanup()
+
+ @classmethod
+ def add_cleanup(cls, resource, response):
+ if resource == 'container':
+ uuid = cls.ref_to_uuid(response['container_ref'])
+ if resource == 'order':
+ uuid = cls.ref_to_uuid(response.get('order_ref'))
+ order_metadata = cls.admin_order_client.get_order(uuid)
+ secret_ref = order_metadata.get('secret_ref')
+ if secret_ref:
+ cls.created_objects['secret'].add(cls.ref_to_uuid(secret_ref))
+ uuid = cls.ref_to_uuid(response['order_ref'])
+ if resource == 'quota':
+ uuid = cls.ref_to_uuid(response['quota_ref'])
+ if resource == 'secret':
+ uuid = cls.ref_to_uuid(response['secret_ref'])
+ cls.created_objects[resource].add(uuid)
+
+ @classmethod
+ def delete_cleanup(cls, resource, uuid):
+ cls.created_objects[resource].remove(uuid)
+
+ def do_request(self, method, client=None, expected_status=200,
+ cleanup=None, **args):
+ if client is None:
+ client = self.client
+ if isinstance(expected_status, type(Exception)):
+ self.assertRaises(expected_status,
+ getattr(client, method),
+ **args)
+ else:
+ response = getattr(client, method)(**args)
+ # self.assertEqual(response.response.status, expected_status)
+ if cleanup is not None:
+ self.add_cleanup(cleanup, response)
+ return response
+
+ def create_empty_secret_admin(self, secret_name):
+ """add empty secret as admin user """
+ return self.do_request(
+ 'create_secret', client=self.admin_secret_client,
+ expected_status=201, cleanup='secret', name=secret_name)
+
+ def create_aes_secret_admin(self, secret_name):
+ key = create_aes_key()
+ expire_time = (datetime.utcnow() + timedelta(days=5))
+ return key, self.do_request(
+ 'create_secret', client=self.admin_secret_client,
+ expected_status=201, cleanup="secret",
+ expiration=expire_time.isoformat(), algorithm="aes",
+ bit_length=256, mode="cbc", payload=key,
+ payload_content_type="application/octet-stream",
+ payload_content_encoding="base64",
+ name=secret_name
+ )
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_containers.py b/barbican_tempest_plugin/tests/rbac/v1/test_containers.py
new file mode 100644
index 0000000..25b5b06
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_containers.py
@@ -0,0 +1,275 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import abc
+
+from tempest import config
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacContainers:
+
+ @abc.abstractmethod
+ def test_list_containers(self):
+ """Test list_containers policy
+
+ Testing: GET /v1/containers
+ This test must check:
+ * whether the persona can list containers
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_container(self):
+ """Test create_container policy
+
+ Testing: POST /v1/containers
+ Thist test must check:
+ * whether the persona can create a new container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_container(self):
+ """Test get_container policy
+
+ Testing: GET /v1/containers/{container-id}
+ Thist test must check:
+ * whether the persona can get a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_container(self):
+ """Test delete_container policy
+
+ Testing: DELETE /v1/containers/{container-id}
+ Thist test must check:
+ * whether the persona can delete a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_add_secret_to_container(self):
+ """Test add_secret_to_container policy
+
+ Testing: POST /v1/containers/{container-id}/secrets
+ Thist test must check:
+ * whether the persona can add a secret to a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret_from_container(self):
+ """Test delete_secret_from_container policy
+
+ Testing: DELETE /v1/containers/{container-id}/secrets
+ Thist test must check:
+ * whether the persona can delete a secret from a container
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.ContainerClient()
+ cls.secret_client = cls.os_project_member.secret_v1.SecretClient()
+
+ def test_list_containers(self):
+ self.do_request('create_container', cleanup='container',
+ name='list_containers', type='generic')
+
+ resp = self.do_request('list_containers')
+ containers = resp['containers']
+
+ self.assertGreaterEqual(len(containers), 1)
+
+ def test_create_container(self):
+ self.do_request('create_container', cleanup='container',
+ name='create_container', type='generic')
+
+ def test_get_container(self):
+ resp = self.do_request('create_container', cleanup='container',
+ name='get_container', type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request('get_container', container_id=container_id)
+
+ self.assertEqual(container_id, self.ref_to_uuid(resp['container_ref']))
+
+ def test_delete_container(self):
+ resp = self.do_request('create_container', name='delete_container',
+ type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request('delete_container', container_id=container_id)
+
+ def test_add_secret_to_container(self):
+ resp = self.do_request('create_container', cleanup='container',
+ name='add_secret_to_container_c',
+ type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.secret_client,
+ cleanup='secret',
+ name='add_secret_to_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ resp = self.do_request('add_secret_to_container',
+ container_id=container_id,
+ secret_id=secret_id)
+
+ def test_delete_secret_from_container(self):
+ resp = self.do_request('create_container', cleanup='container',
+ name='delete_secret_from_container_c',
+ type='generic')
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.secret_client,
+ cleanup='secret',
+ name='delete_secret_from_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ self.do_request('add_secret_to_container',
+ container_id=container_id,
+ secret_id=secret_id)
+ resp = self.do_request('delete_secret_from_container',
+ container_id=container_id,
+ secret_id=secret_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.ContainerClient()
+ cls.secret_client = cls.os_project_admin.secret_v1.SecretClient()
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.ContainerClient()
+
+ def test_list_containers(self):
+ self.do_request('list_containers',
+ expected_status=exceptions.Forbidden)
+
+ def test_create_container(self):
+ self.do_request('create_container',
+ expected_status=exceptions.Forbidden,
+ name='create_container',
+ type='generic')
+
+ def test_get_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='create_container', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ self.do_request('get_container', expected_status=exceptions.Forbidden,
+ container_id=container_id)
+
+ def test_delete_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='delete_container', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ self.do_request('delete_container',
+ expected_status=exceptions.Forbidden,
+ container_id=container_id)
+
+ def test_add_secret_to_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='add_secret_to_container_c', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.os_project_member.secret_v1.SecretClient(),
+ cleanup='secret',
+ name='add_secret_to_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ self.do_request('add_secret_to_container',
+ expected_status=exceptions.Forbidden,
+ container_id=container_id,
+ secret_id=secret_id)
+
+ def test_delete_secret_from_container(self):
+ resp = self.do_request(
+ 'create_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ cleanup='container',
+ name='delete_secret_from_container_c', type='generic'
+ )
+ container_id = self.ref_to_uuid(resp['container_ref'])
+
+ resp = self.do_request(
+ 'create_secret',
+ client=self.os_project_member.secret_v1.SecretClient(),
+ cleanup='secret',
+ name='delete_secret_from_container_s',
+ secret_type='passphrase',
+ payload='shhh... secret',
+ payload_content_type='text/plain'
+ )
+ secret_id = self.ref_to_uuid(resp['secret_ref'])
+
+ self.do_request(
+ 'add_secret_to_container',
+ client=self.os_project_member.secret_v1.ContainerClient(),
+ container_id=container_id,
+ secret_id=secret_id
+ )
+
+ self.do_request('delete_secret_from_container',
+ expected_status=exceptions.Forbidden,
+ container_id=container_id,
+ secret_id=secret_id)
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_orders.py b/barbican_tempest_plugin/tests/rbac/v1/test_orders.py
new file mode 100644
index 0000000..36f9a26
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_orders.py
@@ -0,0 +1,186 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+import time
+
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacOrders:
+
+ @abc.abstractmethod
+ def test_list_orders(self):
+ """Test list_orders policy
+
+ Testing GET /v1/orders
+ This test must check:
+ * whether persona can list orders
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_order(self):
+ """Test create_order policy
+
+ Testing POST /v1/orders
+ This test must check:
+ * whether persona can create orders
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_order(self):
+ """Test get_order policy
+
+ Testing GET /v1/orders/{order-id}
+ This test must check:
+ * whether persona can get order information
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_order(self):
+ """Test delete_order policy
+
+ Testing DELETE /v1/orders/{order-id}
+ This test must check:
+ * whether persona can delete orders
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacOrders):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.OrderClient()
+
+ def test_list_orders(self):
+ self.do_request('create_order', cleanup='order',
+ name='list_orders', type='key',
+ meta={
+ 'name': 'list_orders_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ })
+ resp = self.do_request('list_orders')
+ self.assertGreaterEqual(len(resp['orders']), 1)
+
+ def test_create_order(self):
+ self.do_request('create_order', cleanup='order',
+ name='create_order', type='key',
+ meta={
+ 'name': 'create_orders_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ })
+
+ def test_get_order(self):
+ resp = self.do_request('create_order', cleanup='order',
+ name='get_order', type='key',
+ meta={
+ 'name': 'get_order_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ })
+ order_id = self.ref_to_uuid(resp['order_ref'])
+ resp = self.do_request('get_order', order_id=order_id)
+ self.assertEqual(order_id, self.ref_to_uuid(resp['order_ref']))
+
+ def test_delete_order(self):
+ resp = self.do_request('create_order',
+ name='delete_order', type='key',
+ meta={
+ 'name': 'delete_order_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ })
+ order_id = self.ref_to_uuid(resp['order_ref'])
+ while True:
+ time.sleep(1)
+ resp = self.do_request('get_order', order_id=order_id)
+ if 'ACTIVE' == resp['status']:
+ self.add_cleanup('secret', resp)
+ break
+
+ self.do_request('delete_order', order_id=order_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.OrderClient()
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacOrders):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.OrderClient()
+
+ def test_list_orders(self):
+ self.do_request('list_orders', expected_status=exceptions.Forbidden)
+
+ def test_create_order(self):
+ self.do_request('create_order',
+ expected_status=exceptions.Forbidden,
+ cleanup='order',
+ name='create_order', type='key',
+ meta={
+ 'name': 'create_orders_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ })
+
+ def test_get_order(self):
+ resp = self.do_request(
+ 'create_order',
+ client=self.os_project_member.secret_v1.OrderClient(),
+ cleanup='order',
+ name='get_order', type='key',
+ meta={
+ 'name': 'get_order_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ }
+ )
+ order_id = self.ref_to_uuid(resp['order_ref'])
+ self.do_request('get_order', expected_status=exceptions.Forbidden,
+ order_id=order_id)
+
+ def test_delete_order(self):
+ resp = self.do_request(
+ 'create_order',
+ client=self.os_project_member.secret_v1.OrderClient(),
+ cleanup='order',
+ name='delete_order', type='key',
+ meta={
+ 'name': 'delete_order_s',
+ 'algorithm': 'aes',
+ 'bit_length': 256,
+ 'mode': 'cbc',
+ })
+ order_id = self.ref_to_uuid(resp['order_ref'])
+ self.do_request('delete_order', expected_status=exceptions.Forbidden,
+ order_id=order_id)
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_quotas.py b/barbican_tempest_plugin/tests/rbac/v1/test_quotas.py
new file mode 100644
index 0000000..f5ec7dd
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_quotas.py
@@ -0,0 +1,123 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacQuota:
+
+ @abc.abstractmethod
+ def test_get_effective_project_quota(self):
+ """Test getting the effective quota information
+
+ Testing: GET /v1/quotas
+ This test must check:
+ * whether the persona can retrieve the effective quota for
+ their project.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_list_project_quotas(self):
+ """Test listing all configured project quotas
+
+ Testing: GET /v1/project-quotas
+ This test must check:
+ * whether the persona can retrieve all modified quotas for
+ the entire system.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_custom_quota_for_project(self):
+ """Test getting a custom quota for a specific project
+
+ Testing: GET /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can retrieve the custom quota for a
+ specific project.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_set_new_quota_for_project(self):
+ """Test setting a custom quota for a specific project
+
+ Testing: PUT /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can create custom quotas for a
+ specific project.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_remove_custom_quota_for_project(self):
+ """Test removing a custom quota for a specific project
+
+ Testing: DELETE /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can delete custom quotas for a
+ specific project.
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacQuota):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.QuotaClient()
+
+ def test_get_effective_project_quota(self):
+ resp = self.do_request('get_default_project_quota')
+ self.assertIn('quotas', resp)
+
+ def test_list_project_quotas(self):
+ self.do_request('list_quotas', expected_status=exceptions.Forbidden)
+
+ def test_get_custom_quota_for_project(self):
+ project_id = self.client.tenant_id
+ self.do_request('get_project_quota',
+ expected_status=exceptions.Forbidden,
+ project_id=project_id)
+
+ def test_set_new_quota_for_project(self):
+ project_id = self.client.tenant_id
+ self.do_request('create_project_quota',
+ expected_status=exceptions.Forbidden,
+ project_id=project_id)
+
+ def test_remove_custom_quota_for_project(self):
+ project_id = self.client.tenant_id
+ self.do_request('delete_project_quota',
+ expected_status=exceptions.Forbidden,
+ project_id=project_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.QuotaClient()
+
+
+class ProjectReaderTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.QuotaClient()
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secret_stores.py b/barbican_tempest_plugin/tests/rbac/v1/test_secret_stores.py
new file mode 100644
index 0000000..6f0a00d
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secret_stores.py
@@ -0,0 +1,187 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest import config
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacSecretStores:
+
+ @abc.abstractmethod
+ def test_list_secret_stores(self):
+ """Test getting a list of all backends
+
+ Testing: GET /v1/secret-stores
+ This test must check:
+ * whether the persona can list all secret stores
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_store(self):
+ """Test get secret store information
+
+ Testing: GET /v1/secret-stores/{secret-store-id}
+ This test must check:
+ * whether the persona can get information about a specific
+ secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_global_secret_store(self):
+ """Test getting the global secret store
+
+ Testing: GET /v1/secret-stores/global-default
+ This test must check:
+ * whether the persona can get information about the global
+ default secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_preferred_secret_store(self):
+ """Test getting the preferred secret store
+
+ Testing: GET /v1/secret-stores/preferred
+ This test must check:
+ * whether the persona can get information about their project's
+ preferred secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_set_preferred_secret_store(self):
+ """Test setting the preferred secret store
+
+ Testing: POST /v1/secret-stores/{secret-store-id}/preferred
+ This test must check:
+ * whether the persona can set their project's preferred
+ secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_unset_preferred_secret_store(self):
+ """Test removing the preferred secret store
+
+ Testing: DELETE /v1/secret-stores/{secret-store-id}/preferred
+ This test must check:
+ * whether the persona can set their project's preferred
+ secret store
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacSecretStores):
+
+ @classmethod
+ def skip_checks(cls):
+ """TODO(redrobot): Run this with multiple backends
+
+ We need to set up the devstack plugin to use multiple backends
+ so we can run these tests.
+ """
+ if not CONF.barbican_tempest.enable_multiple_secret_stores:
+ raise cls.skipException("enable_multiple_secret_stores is not "
+ "configured. Skipping RBAC tests.")
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.SecretStoresClient()
+
+ def test_list_secret_stores(self):
+ resp = self.do_request('list_secret_stores')
+ self.assertIn('secret_stores', resp)
+
+ def test_get_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ resp = self.do_request('get_secret_store',
+ secret_store_id=secret_store_id)
+ self.assertEqual(secret_store_id,
+ self.ref_to_uuid(resp['secret_store_ref']))
+
+ def test_get_global_secret_store(self):
+ resp = self.do_request('get_global_secret_store')
+ self.assertTrue(resp['global_default'])
+
+ def test_get_preferred_secret_store(self):
+ resp = self.do_request('get_preferred_secret_store')
+ self.assertEqual('ACTIVE', resp['status'])
+
+ def test_set_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('set_preferred_secret_store',
+ expected_status=exceptions.Forbidden,
+ secret_store_id=secret_store_id)
+
+ def test_unset_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('unset_peferred_secret_store',
+ expected_status=exceptions.Forbidden,
+ secret_store_id=secret_store_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.SecretStoresClient()
+
+ def test_set_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('set_preferred_secret_store',
+ secret_store_id=secret_store_id)
+ resp = self.do_request('get_preferred_secret_store')
+ self.assertEqual(secret_store_id,
+ self.ref_to_uuid(resp['secret_store_ref']))
+
+ def test_unset_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('set_preferred_secret_store',
+ secret_store_id=secret_store_id)
+ self.do_request('unset_peferred_secret_store',
+ secret_store_id=secret_store_id)
+ resp = self.do_request('get_preferred_secret_store')
+ self.assertEqual(secret_store_id,
+ self.ref_to_uuid(resp['secret_store_ref']))
+
+
+class ProjectReaderTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.SecretStoresClient()
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
new file mode 100644
index 0000000..6b8f657
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
@@ -0,0 +1,344 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import base64
+from datetime import datetime
+from datetime import timedelta
+
+from tempest import config
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base as rbac_base
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacSecrets(metaclass=abc.ABCMeta):
+
+ @abc.abstractmethod
+ def test_create_secret(self):
+ """Test add_secret policy.
+
+ Testing: POST /v1/secrets
+ This test must check:
+ * whether the persona can create an empty secret
+ * whether the persona can create a secret with a symmetric key
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_list_secrets(self):
+ """Test get_secrets policy.
+
+ Testing: GET /v1/secrets
+ This test must check:
+ * whether the persona can list secrets within their project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret(self):
+ """Test deleting a secret.
+
+ Testing: DEL /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can delete a secret in their project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret(self):
+ """Test get_secret policy.
+
+ Testing: GET /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can get a specific secret within their project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_payload(self):
+ """Test get_secret payload policy.
+
+ Testing: GET /v1/secrets/{secret_id}/payload
+ This test must check:
+ * whether the persona can get a secret payload
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_secret_payload(self):
+ """Test put_secret policy.
+
+ Testing: PUT /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can add a paylod to an empty secret
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.SecretClient()
+
+ def test_create_secret(self):
+ """Test add_secret policy."""
+ self.do_request('create_secret', expected_status=201, cleanup='secret',
+ name='test_create_secret')
+
+ key = rbac_base.create_aes_key()
+ expire_time = (datetime.utcnow() + timedelta(days=5))
+ self.do_request(
+ 'create_secret', expected_status=201, cleanup="secret",
+ name='test_create_secret2',
+ expiration=expire_time.isoformat(), algorithm="aes",
+ bit_length=256, mode="cbc", payload=key,
+ payload_content_type="application/octet-stream",
+ payload_content_encoding="base64"
+ )
+
+ def test_list_secrets(self):
+ """Test get_secrets policy."""
+ # create two secrets
+ self.create_empty_secret_admin('test_list_secrets')
+ self.create_empty_secret_admin('test_list_secrets_2')
+
+ # list secrets with name secret_1
+ resp = self.do_request('list_secrets', name='test_list_secrets')
+ secrets = resp['secrets']
+ self.assertEqual('test_list_secrets', secrets[0]['name'])
+
+ # list secrets with name secret_2
+ resp = self.do_request('list_secrets', name='test_list_secrets_2')
+ secrets = resp['secrets']
+ self.assertEqual('test_list_secrets_2', secrets[0]['name'])
+
+ # list all secrets
+ resp = self.do_request('list_secrets')
+ secrets = resp['secrets']
+ self.assertGreaterEqual(len(secrets), 2)
+
+ def test_delete_secret(self):
+ """Test delete_secrets policy."""
+ sec = self.create_empty_secret_admin('test_delete_secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+ self.do_request('delete_secret', secret_id=uuid)
+ self.delete_cleanup('secret', uuid)
+
+ def test_get_secret(self):
+ """Test get_secret policy."""
+ sec = self.create_empty_secret_admin('test_get_secret')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+ resp = self.do_request('get_secret_metadata', secret_id=uuid)
+ self.assertEqual(uuid, self.ref_to_uuid(resp['secret_ref']))
+
+ def test_get_secret_payload(self):
+ """Test get_secret payload policy."""
+ key, sec = self.create_aes_secret_admin('test_get_secret_payload')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ # Retrieve the payload
+ payload = self.do_request('get_secret_payload', secret_id=uuid)
+ self.assertEqual(key, base64.b64encode(payload))
+
+ def test_put_secret_payload(self):
+ """Test put_secret policy."""
+ sec = self.create_empty_secret_admin('test_put_secret_payload')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ key = rbac_base.create_aes_key()
+
+ # Associate the payload with the created secret
+ self.do_request('put_secret_payload', secret_id=uuid, payload=key)
+
+ # Retrieve the payload
+ payload = self.do_request('get_secret_payload', secret_id=uuid)
+ self.assertEqual(key, base64.b64encode(payload))
+
+
+class ProjectAdminTests(ProjectMemberTests):
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.SecretClient()
+
+
+class ProjectReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.SecretClient()
+
+ def test_create_secret(self):
+ """Test add_secret policy."""
+ self.do_request(
+ 'create_secret', expected_status=exceptions.Forbidden,
+ cleanup='secret')
+
+ key = rbac_base.create_aes_key()
+ expire_time = (datetime.utcnow() + timedelta(days=5))
+ self.do_request(
+ 'create_secret', expected_status=exceptions.Forbidden,
+ cleanup="secret",
+ expiration=expire_time.isoformat(), algorithm="aes",
+ bit_length=256, mode="cbc", payload=key,
+ payload_content_type="application/octet-stream",
+ payload_content_encoding="base64"
+ )
+
+ def test_list_secrets(self):
+ """Test get_secrets policy."""
+ # create two secrets
+ self.create_empty_secret_admin('secret_1')
+ self.create_empty_secret_admin('secret_2')
+
+ # list secrets with name secret_1
+ self.do_request(
+ 'list_secrets', expected_status=exceptions.Forbidden,
+ name='secret_1'
+ )
+
+ # list secrets with name secret_2
+ self.do_request(
+ 'list_secrets', expected_status=exceptions.Forbidden,
+ name='secret_2'
+ )
+
+ # list all secrets
+ self.do_request(
+ 'list_secrets', expected_status=exceptions.Forbidden
+ )
+
+ def test_delete_secret(self):
+ """Test delete_secrets policy."""
+ sec = self.create_empty_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+ self.do_request(
+ 'delete_secret', expected_status=exceptions.Forbidden,
+ secret_id=uuid
+ )
+
+ def test_get_secret(self):
+ """Test get_secret policy."""
+ sec = self.create_empty_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+ self.do_request(
+ 'get_secret_metadata', expected_status=exceptions.Forbidden,
+ secret_id=uuid
+ )
+
+ def test_get_secret_payload(self):
+ """Test get_secret payload policy."""
+ key, sec = self.create_aes_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ # Retrieve the payload
+ self.do_request(
+ 'get_secret_payload', expected_status=exceptions.Forbidden,
+ secret_id=uuid
+ )
+
+ def test_put_secret_payload(self):
+ """Test put_secret policy."""
+ sec = self.create_empty_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ key = rbac_base.create_aes_key()
+
+ # Associate the payload with the created secret
+ self.do_request(
+ 'put_secret_payload', expected_status=exceptions.Forbidden,
+ secret_id=uuid, payload=key
+ )
+
+
+class SystemAdminTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
+
+ def test_create_secret(self):
+ pass
+
+ def test_list_secrets(self):
+ pass
+
+ def test_delete_secret(self):
+ pass
+
+ def test_get_secret(self):
+ pass
+
+ def test_get_secret_payload(self):
+ pass
+
+ def test_put_secret_payload(self):
+ pass
+
+
+class SystemMemberTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
+
+ def test_create_secret(self):
+ pass
+
+ def test_list_secrets(self):
+ pass
+
+ def test_delete_secret(self):
+ pass
+
+ def test_get_secret(self):
+ pass
+
+ def test_get_secret_payload(self):
+ pass
+
+ def test_put_secret_payload(self):
+ pass
+
+
+class SystemReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
+
+ def test_create_secret(self):
+ pass
+
+ def test_list_secrets(self):
+ pass
+
+ def test_delete_secret(self):
+ pass
+
+ def test_get_secret(self):
+ pass
+
+ def test_get_secret_payload(self):
+ pass
+
+ def test_put_secret_payload(self):
+ pass
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_transport_keys.py b/barbican_tempest_plugin/tests/rbac/v1/test_transport_keys.py
new file mode 100644
index 0000000..1984943
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_transport_keys.py
@@ -0,0 +1,121 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacTransportKeys:
+
+ @abc.abstractmethod
+ def test_list_transport_keys(self):
+ """Test listing the transport keys
+
+ Testing: GET /v1/transport_keys
+ This test case must check:
+ * whether the persona can list the available transport keys
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_transport_key(self):
+ """Test creating a transport key
+
+ Testing: POST /v1/transport_keys
+ This test case must check:
+ * whether the persona can create a new transport key entry
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_transport_key(self):
+ """Test getting a specific transport key
+
+ Testing: GET /v1/transport_keys/{transport-key-id}
+ This test case must check:
+ * whether the persona can retrieve a specific transport key
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_transport_key(self):
+ """Test deleting a specific transport key
+
+ Testing: DELETE /v1/transport_keys/{transport-key-id}
+ This test case must check:
+ * whether the persona can delete a specific transport key
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacTransportKeys):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.TransportKeyClient()
+
+ def test_list_transport_keys(self):
+ resp = self.do_request('list_transport_keys')
+ self.assertIn('transport_keys', resp)
+
+ def test_create_transport_key(self):
+ self.do_request('create_transport_key',
+ expected_status=exceptions.Forbidden,
+ plugin_name='simple-crypto',
+ transport_key='???')
+
+ def test_get_transport_key(self):
+ # TODO(redorobot):
+ # We need to sort out how system admins create keys before we
+ # can test this.
+ #
+ # resp = self.do_request('list_transport_keys')
+ # transport_key_id = self.ref_to_uuid(
+ # resp['transport_keys'][0]['transport_key_ref']
+ # )
+ # resp = self.do_request('get_transport_key',
+ # transport_key_id=transport_key_id)
+ # self.assertEqual(transport_key_id, resp['transport_key_id'])
+ pass
+
+ def test_delete_transport_key(self):
+ # TODO(redorobot):
+ # We need to sort out how system admins create keys before we
+ # can test this.
+ #
+ # resp = self.do_request('list_transport_keys')
+ # transport_key_id = self.ref_to_uuid(
+ # resp['transport_keys'][0]['transport_key_ref']
+ # )
+ # resp = self.do_request('delete_transport_key',
+ # expected_status=exceptions.Forbidden,
+ # transport_key_id=transport_key_id)
+ pass
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.TransportKeyClient()
+
+
+class ProjectReaderTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.TransportKeyClient()
diff --git a/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py b/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py
index 5a6b5b7..403d6cc 100644
--- a/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py
+++ b/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py
@@ -46,6 +46,11 @@
cls.max_microversion,
CONF.compute.min_microversion,
CONF.compute.max_microversion)
+ if not CONF.auth.create_isolated_networks:
+ # FIXME(redorobt): remove this skip when system-scope admin
+ # issue is fixed.
+ raise cls.skipException(
+ 'Certificate Validation tests require isolated networks')
@decorators.idempotent_id('b41bc663-5662-4b1e-b8f1-27b2876f16a6')
@utils.services('compute', 'image')
diff --git a/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py b/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py
index ee1bda5..1bc70d6 100644
--- a/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py
+++ b/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py
@@ -42,6 +42,11 @@
if not CONF.ephemeral_storage_encryption.enabled:
raise cls.skipException(
'Ephemeral storage encryption is not supported')
+ if not CONF.auth.create_isolated_networks:
+ # FIXME(redorobt): remove this skip when system-scope admin
+ # issue is fixed.
+ raise cls.skipException(
+ 'Ephemeral storage encryption requires isolated networks')
@classmethod
def resource_setup(cls):
diff --git a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py
index f9191bf..26919c6 100644
--- a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py
+++ b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py
@@ -48,6 +48,11 @@
super(VolumeEncryptionTest, cls).skip_checks()
if not CONF.compute_feature_enabled.attach_encrypted_volume:
raise cls.skipException('Encrypted volume attach is not supported')
+ if not CONF.auth.create_isolated_networks:
+ # FIXME(redorobt): remove this skip when system-scope admin
+ # issue is fixed.
+ raise cls.skipException(
+ 'Volume encryption requires isolated networks')
@classmethod
def resource_setup(cls):
@@ -128,7 +133,6 @@
self.check_tenant_network_connectivity(
server, CONF.validation.image_ssh_user, keypair['private_key'])
- volume = self.create_encrypted_volume('nova.volume.encryptors.'
- 'cryptsetup.CryptsetupEncryptor',
+ volume = self.create_encrypted_volume('plain',
volume_type='cryptsetup')
self.attach_detach_volume(server, volume, keypair)
diff --git a/tox.ini b/tox.ini
index ee31c64..e786626 100644
--- a/tox.ini
+++ b/tox.ini
@@ -7,7 +7,7 @@
[testenv]
basepython = python3
usedevelop = True
-install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
+install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning