Merge "Add skip for test_encrypted_cinder_volumes_cryptsetup"
diff --git a/.zuul.yaml b/.zuul.yaml
index 1b3cb87..d60cc74 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -1,18 +1,20 @@
- project:
+ queue: barbican
templates:
- tempest-plugin-jobs
- check-requirements
check:
jobs:
- barbican-tempest-plugin-simple-crypto
- - barbican-tempest-plugin-simple-crypto-victoria
- - barbican-tempest-plugin-simple-crypto-ussuri
- - barbican-tempest-plugin-simple-crypto-train
+ - barbican-tempest-plugin-simple-crypto-secure-rbac
+ - barbican-tempest-plugin-simple-crypto-zed
+ - barbican-tempest-plugin-simple-crypto-yoga
+ - barbican-tempest-plugin-simple-crypto-xena
+ - barbican-tempest-plugin-simple-crypto-wallaby
- barbican-tempest-plugin-simple-crypto-ipv6-only
- barbican-tempest-plugin-simple-crypto-castellan-src
- barbican-tempest-plugin-simple-crypto-cursive
gate:
- queue: barbican
jobs:
- barbican-tempest-plugin-simple-crypto
@@ -47,28 +49,112 @@
api_v1: False
ephemeral_storage_encryption:
enabled: True
+ key_manager:
+ min_microversion: '1.0'
+ max_microversion: latest
tox_envlist: all
tempest_test_regex: barbican
tempest_plugins:
- barbican-tempest-plugin
- job:
+ name: barbican-tempest-plugin-simple-crypto-zed
+ parent: barbican-tempest-plugin-simple-crypto
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/zed
+ vars:
+ devstack_local_conf:
+ test-config:
+ $TEMPEST_CONFIG:
+ key_manager:
+ min_microversion: '1.0'
+ max_microversion: '1.1'
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-yoga
+ parent: barbican-tempest-plugin-simple-crypto
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/yoga
+ vars: µversion_v1_0
+ devstack_local_conf:
+ test-config:
+ $TEMPEST_CONFIG:
+ key_manager:
+ min_microversion: '1.0'
+ max_microversion: '1.0'
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-xena
+ parent: barbican-tempest-plugin-simple-crypto
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/xena
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-wallaby
+ parent: barbican-tempest-plugin-simple-crypto
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/wallaby
+ vars: *microversion_v1_0
+
+- job:
name: barbican-tempest-plugin-simple-crypto-victoria
parent: barbican-tempest-plugin-simple-crypto
- nodeset: openstack-single-node-bionic
+ nodeset: openstack-single-node-focal
override-checkout: stable/victoria
+ vars: *microversion_v1_0
- job:
name: barbican-tempest-plugin-simple-crypto-ussuri
parent: barbican-tempest-plugin-simple-crypto
nodeset: openstack-single-node-bionic
override-checkout: stable/ussuri
+ vars: *microversion_v1_0
- job:
name: barbican-tempest-plugin-simple-crypto-train
parent: barbican-tempest-plugin-simple-crypto
nodeset: openstack-single-node-bionic
override-checkout: stable/train
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-secure-rbac
+ parent: barbican-tempest-plugin-simple-crypto
+ vars:
+ devstack_local_conf:
+ post-config:
+ $BARBICAN_CONF:
+ oslo_policy:
+ enforce_new_defaults: True
+ enforce_scope: True
+ test-config:
+ $TEMPEST_CONFIG:
+ auth:
+ tempest_roles: member
+ barbican_rbac_scope_verification:
+ enforce_scope: True
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-secure-rbac-yoga
+ parent: barbican-tempest-plugin-simple-crypto-secure-rbac
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/yoga
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-secure-rbac-xena
+ parent: barbican-tempest-plugin-simple-crypto-secure-rbac
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/xena
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-secure-rbac-wallaby
+ parent: barbican-tempest-plugin-simple-crypto-secure-rbac
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/wallaby
+ vars: *microversion_v1_0
- job:
name: barbican-tempest-plugin-simple-crypto-ipv6-only
@@ -76,6 +162,49 @@
required-projects: *barbican-tempest-reqs
vars: *barbican-tempest-vars
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-ipv6-only-yoga
+ parent: barbican-tempest-plugin-simple-crypto-ipv6-only
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/yoga
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-ipv6-only-xena
+ parent: barbican-tempest-plugin-simple-crypto-ipv6-only
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/xena
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-ipv6-only-wallaby
+ parent: barbican-tempest-plugin-simple-crypto-ipv6-only
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/wallaby
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-ipv6-only-victoria
+ parent: barbican-tempest-plugin-simple-crypto-ipv6-only
+ nodeset: openstack-single-node-focal
+ override-checkout: stable/victoria
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-ipv6-only-ussuri
+ parent: barbican-tempest-plugin-simple-crypto-ipv6-only
+ nodeset: openstack-single-node-bionic
+ override-checkout: stable/ussuri
+ vars: *microversion_v1_0
+
+- job:
+ name: barbican-tempest-plugin-simple-crypto-ipv6-only-train
+ parent: barbican-tempest-plugin-simple-crypto-ipv6-only
+ nodeset: openstack-single-node-bionic
+ override-checkout: stable/train
+ vars: *microversion_v1_0
+
- job:
name: barbican-tempest-plugin-simple-crypto-castellan-src
parent: barbican-tempest-plugin-simple-crypto
diff --git a/barbican_tempest_plugin/config.py b/barbican_tempest_plugin/config.py
index 0c4a2ac..da78d15 100644
--- a/barbican_tempest_plugin/config.py
+++ b/barbican_tempest_plugin/config.py
@@ -20,6 +20,43 @@
help="Whether or not barbican is expected to be "
"available")
+key_manager_group = cfg.OptGroup(
+ name='key_manager',
+ title='Key Manager (Barbican) service options'
+)
+
+KeyManagerOpts = [
+ cfg.StrOpt('min_microversion',
+ default=None,
+ help="Lower version of the test target microversion range. "
+ "The format is 'X.Y', where 'X' and 'Y' are int values. "
+ "Tempest selects tests based on the range between "
+ "min_microversion and max_microversion. "
+ "If both values are not specified, Tempest avoids tests "
+ "which require a microversion. Valid values are string "
+ "with format 'X.Y' or string 'latest'"),
+ cfg.StrOpt('max_microversion',
+ default=None,
+ help="Upper version of the test target microversion range. "
+ "The format is 'X.Y', where 'X' and 'Y' are int values. "
+ "Tempest selects tests based on the range between "
+ "min_microversion and max_microversion. "
+ "If both values are not specified, Tempest avoids tests "
+ "which require a microversion. Valid values are string "
+ "with format 'X.Y' or string 'latest'")
+]
+
+barbican_tempest_group = cfg.OptGroup(
+ name='barbican_tempest',
+ title='Key Manager (Barbican) service options'
+)
+
+BarbicanGroupOpts = [
+ cfg.BoolOpt('enable_multiple_secret_stores',
+ default=False,
+ help="Flag to enable mulitple secret store tests")
+]
+
ephemeral_storage_encryption_group = cfg.OptGroup(
name="ephemeral_storage_encryption",
title="Ephemeral storage encryption options")
@@ -54,3 +91,14 @@
help="Does the test environment enforce glance image "
"verification?"),
]
+
+barbican_rbac_scope_verification_group = cfg.OptGroup(
+ name="barbican_rbac_scope_verification",
+ title="Barbican RBAC Verification Options")
+
+BarbicanRBACScopeVerificationGroup = [
+ cfg.BoolOpt('enforce_scope',
+ default=False,
+ help="Does barbican enforce scope and user "
+ "scope-aware policies?"),
+]
diff --git a/barbican_tempest_plugin/plugin.py b/barbican_tempest_plugin/plugin.py
index 1914ecb..4649e85 100644
--- a/barbican_tempest_plugin/plugin.py
+++ b/barbican_tempest_plugin/plugin.py
@@ -33,12 +33,26 @@
conf.register_opt(project_config.service_option,
group='service_available')
+ conf.register_group(project_config.key_manager_group)
+ conf.register_opts(project_config.KeyManagerOpts,
+ project_config.key_manager_group)
+
+ conf.register_group(project_config.barbican_tempest_group)
+ conf.register_opts(project_config.BarbicanGroupOpts,
+ project_config.barbican_tempest_group)
+
# Register ephemeral storage encryption options
conf.register_group(project_config.ephemeral_storage_encryption_group)
conf.register_opts(project_config.EphemeralStorageEncryptionGroup,
project_config.ephemeral_storage_encryption_group)
conf.register_opts(project_config.ImageSignatureVerificationGroup,
project_config.image_signature_verification_group)
+ conf.register_group(
+ project_config.barbican_rbac_scope_verification_group)
+ conf.register_opts(
+ project_config.BarbicanRBACScopeVerificationGroup,
+ project_config.barbican_rbac_scope_verification_group
+ )
def get_opt_lists(self):
return [('service_available', [project_config.service_option])]
@@ -54,7 +68,18 @@
'OrderClient',
'QuotaClient',
'SecretClient',
- 'SecretMetadataClient'
+ 'SecretMetadataClient',
+ 'SecretStoresClient',
+ 'TransportKeyClient'
],
}
- return [v1_params]
+ v1_1_params = {
+ 'name': 'secret_v1_1',
+ 'service_version': 'secret.v1_1',
+ 'module_path': 'barbican_tempest_plugin.services.key_manager.v1_1',
+ 'client_names': [
+ 'SecretConsumerClient',
+ 'VersionClient'
+ ],
+ }
+ return [v1_params, v1_1_params]
diff --git a/barbican_tempest_plugin/services/key_manager/json/__init__.py b/barbican_tempest_plugin/services/key_manager/json/__init__.py
index 7bce46a..ebab977 100644
--- a/barbican_tempest_plugin/services/key_manager/json/__init__.py
+++ b/barbican_tempest_plugin/services/key_manager/json/__init__.py
@@ -24,6 +24,10 @@
import SecretClient
from barbican_tempest_plugin.services.key_manager.json.secret_metadata_client \
import SecretMetadataClient
+from barbican_tempest_plugin.services.key_manager.json.secret_stores_client \
+ import SecretStoresClient
+from barbican_tempest_plugin.services.key_manager.json.transport_key_client \
+ import TransportKeyClient
__all__ = [
'ConsumerClient',
@@ -31,5 +35,7 @@
'OrderClient',
'QuotaClient',
'SecretClient',
- 'SecretMetadataClient'
+ 'SecretMetadataClient',
+ 'SecretStoresClient',
+ 'TransportKeyClient'
]
diff --git a/barbican_tempest_plugin/services/key_manager/json/base.py b/barbican_tempest_plugin/services/key_manager/json/base.py
new file mode 100644
index 0000000..dedc0fd
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/base.py
@@ -0,0 +1,36 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from tempest.lib.common import rest_client
+
+
+_DEFAULT_SERVICE_TYPE = 'key-manager'
+_MICROVERSION_HEADER = 'OpenStack-API-Version'
+
+
+class BarbicanTempestClient(rest_client.RestClient):
+
+ _microversion = None
+
+ def __init__(self, *args, **kwargs):
+ kwargs['service'] = _DEFAULT_SERVICE_TYPE
+ super().__init__(*args, **kwargs)
+
+ def get_headers(self, accept_type=None, send_type=None):
+ headers = super().get_headers(accept_type, send_type)
+ if self._microversion:
+ headers[_MICROVERSION_HEADER] = \
+ f'{_DEFAULT_SERVICE_TYPE} {self._microversion}'
+ return headers
+
+ @classmethod
+ def ref_to_uuid(cls, href):
+ return href.split('/')[-1]
diff --git a/barbican_tempest_plugin/services/key_manager/json/consumer_client.py b/barbican_tempest_plugin/services/key_manager/json/consumer_client.py
index 37fbb86..eb34b94 100644
--- a/barbican_tempest_plugin/services/key_manager/json/consumer_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/consumer_client.py
@@ -18,12 +18,14 @@
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
CONF = config.CONF
-class ConsumerClient(rest_client.RestClient):
+class ConsumerClient(base.BarbicanTempestClient):
def list_consumers_in_container(self, container_id, **kwargs):
uri = "/v1/containers/%s/consumers" % container_id
diff --git a/barbican_tempest_plugin/services/key_manager/json/container_client.py b/barbican_tempest_plugin/services/key_manager/json/container_client.py
index 7e24396..812e722 100644
--- a/barbican_tempest_plugin/services/key_manager/json/container_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/container_client.py
@@ -11,19 +11,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
CONF = config.CONF
-class ContainerClient(rest_client.RestClient):
+class ContainerClient(base.BarbicanTempestClient):
def list_containers(self, **kwargs):
uri = "v1/containers"
@@ -82,3 +80,34 @@
)
self.expected_success(204, response.status)
return
+
+ def get_container_acl(self, container_id):
+ headers = {
+ 'Accept': 'application/json'
+ }
+ resp, body = self.get('v1/containers/{}/acl'.format(container_id),
+ headers=headers)
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def put_container_acl(self, container_id, acl):
+ req_body = json.dumps(acl)
+ resp, body = self.put('v1/containers/{}/acl'.format(container_id),
+ req_body)
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def patch_container_acl(self, container_id, acl):
+ req_body = json.dumps(acl)
+ resp, body = self.patch('v1/containers/{}/acl'.format(container_id),
+ req_body)
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def delete_container_acl(self, container_id):
+ resp, body = self.delete('v1/containers/{}/acl'.format(container_id))
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def queue_for_cleanup(self, container_id):
+ raise NotImplementedError
diff --git a/barbican_tempest_plugin/services/key_manager/json/order_client.py b/barbican_tempest_plugin/services/key_manager/json/order_client.py
index d7a3945..84d64ef 100644
--- a/barbican_tempest_plugin/services/key_manager/json/order_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/order_client.py
@@ -11,19 +11,31 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
CONF = config.CONF
-class OrderClient(rest_client.RestClient):
+class OrderClient(base.BarbicanTempestClient):
+
+ def __init__(self, *args, secret_client=None, container_client=None,
+ **kwargs):
+ """Create a new order client
+
+ secret_client and container_client are optional and will be used
+ to queue the respective objects for cleanup when given.
+ """
+ super().__init__(*args, **kwargs)
+ self._order_ids = set()
+ self._secret_client = secret_client
+ self._container_client = container_client
def list_orders(self, **kwargs):
uri = "/v1/orders"
@@ -39,7 +51,9 @@
response, body = self.post(uri, json.dumps(kwargs))
self.expected_success(202, response.status)
- return json.loads(body.decode("utf-8"))
+ resp = json.loads(body.decode("utf-8"))
+ self._order_ids.add(self.ref_to_uuid(resp['order_ref']))
+ return resp
def get_order(self, order_id):
uri = "v1/orders/%s" % order_id
@@ -49,8 +63,46 @@
return json.loads(body.decode("utf-8"))
def delete_order(self, order_id):
+ self._order_ids.discard(order_id)
uri = "/v1/orders/%s" % order_id
+ self._queue_cleanup(order_id)
+
response, _ = self.delete(uri)
self.expected_success(204, response.status)
return
+
+ def cleanup(self):
+ """Attempt to delete all orders created by this client
+
+ If this client was instantiated with secret and/or container
+ clients, then we try to queue for cleanup any objects generated
+ by the orders.
+ """
+ cleanup_ids = self._order_ids
+ self._order_ids = set()
+ for order_id in cleanup_ids:
+ self._queue_cleanup(order_id)
+ try:
+ self.delete_order(order_id)
+ except exceptions.NotFound:
+ continue
+
+ def _queue_cleanup(self, order_id):
+ try:
+ order = self.get_order(order_id)
+ except exceptions.NotFound:
+ pass
+ except exceptions.Forbidden:
+ pass
+ else:
+ if (self._secret_client is not None) and \
+ (order.get('secret_ref') is not None):
+ self._secret_client.queue_for_cleanup(
+ self.ref_to_uuid(order['secret_ref'])
+ )
+ if (self._container_client is not None) and \
+ (order.get('container_ref') is not None):
+ self._container_client.queue_for_cleanup(
+ self.ref_to_uuid(order['container_ref'])
+ )
diff --git a/barbican_tempest_plugin/services/key_manager/json/quota_client.py b/barbican_tempest_plugin/services/key_manager/json/quota_client.py
index ba09b56..a238932 100644
--- a/barbican_tempest_plugin/services/key_manager/json/quota_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/quota_client.py
@@ -14,16 +14,17 @@
import json
-
from urllib import parse as urllib
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
CONF = config.CONF
-class QuotaClient(rest_client.RestClient):
+class QuotaClient(base.BarbicanTempestClient):
def list_quotas(self, **kwargs):
uri = "v1/project-quotas"
diff --git a/barbican_tempest_plugin/services/key_manager/json/secret_client.py b/barbican_tempest_plugin/services/key_manager/json/secret_client.py
index 8c1fa16..5eb97b5 100644
--- a/barbican_tempest_plugin/services/key_manager/json/secret_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/secret_client.py
@@ -17,14 +17,22 @@
import json
from tempest import config
-from tempest.lib.common import rest_client
from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
CONF = config.CONF
-class SecretClient(rest_client.RestClient):
- def create_secret(self, **kwargs):
+class SecretClient(base.BarbicanTempestClient):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._secret_ids = set()
+
+ def create_secret(self, expected_status=201, **kwargs):
if 'name' not in kwargs:
kwargs['name'] = data_utils.rand_name("tempest-sec")
@@ -34,10 +42,13 @@
post_body = kwargs
body = json.dumps(post_body)
resp, body = self.post("v1/secrets", body)
- self.expected_success(201, resp.status)
- return self._parse_resp(body)
+ self.expected_success(expected_status, resp.status)
+ resp = self._parse_resp(body)
+ self._secret_ids.add(self.ref_to_uuid(resp['secret_ref']))
+ return resp
def delete_secret(self, secret_id):
+ self._secret_ids.discard(secret_id)
resp, body = self.delete("v1/secrets/%s" % secret_id)
self.expected_success(204, resp.status)
return body
@@ -81,3 +92,43 @@
headers=content_headers)
self.expected_success(204, resp.status)
return body
+
+ def get_secret_acl(self, secret_id):
+ headers = {
+ 'Accept': 'application/json'
+ }
+ resp, body = self.get('v1/secrets/{}/acl'.format(secret_id),
+ headers=headers)
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def put_secret_acl(self, secret_id, acl):
+ req_body = json.dumps(acl)
+ resp, body = self.put('v1/secrets/{}/acl'.format(secret_id),
+ req_body)
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def patch_secret_acl(self, secret_id, acl):
+ req_body = json.dumps(acl)
+ resp, body = self.patch('v1/secrets/{}/acl'.format(secret_id),
+ req_body)
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def delete_secret_acl(self, secret_id):
+ resp, body = self.delete('v1/secrets/{}/acl'.format(secret_id))
+ self.expected_success(200, resp.status)
+ return json.loads(body)
+
+ def queue_for_cleanup(self, secret_id):
+ self._secret_ids.add(secret_id)
+
+ def cleanup(self):
+ cleanup_ids = self._secret_ids
+ self._secret_ids = set()
+ for secret_id in cleanup_ids:
+ try:
+ self.delete_secret(secret_id)
+ except exceptions.NotFound:
+ pass
diff --git a/barbican_tempest_plugin/services/key_manager/json/secret_metadata_client.py b/barbican_tempest_plugin/services/key_manager/json/secret_metadata_client.py
index dae8ae3..5376217 100644
--- a/barbican_tempest_plugin/services/key_manager/json/secret_metadata_client.py
+++ b/barbican_tempest_plugin/services/key_manager/json/secret_metadata_client.py
@@ -16,16 +16,18 @@
import json
from tempest import config
-from tempest.lib.common import rest_client
+
+from barbican_tempest_plugin.services.key_manager.json import base
CONF = config.CONF
-class SecretMetadataClient(rest_client.RestClient):
+class SecretMetadataClient(base.BarbicanTempestClient):
def get_secret_metadata(self, secret_id):
resp, body = self.get("v1/secrets/%s/metadata" % secret_id)
self.expected_success(200, resp.status)
+ # Note: "metadata" top level key gets dropped by _parse_resp()
return self._parse_resp(body)
def put_secret_metadata(self, secret_id, **kwargs):
@@ -33,6 +35,7 @@
uri = "v1/secrets/%s/metadata" % secret_id
resp, body = self.put(uri, json.dumps(body_dict))
self.expected_success(201, resp.status)
+ # Note: "metadata" top level key gets dropped by _parse_resp()
return self._parse_resp(body)
def get_secret_metadata_by_key(self, secret_id, key):
diff --git a/barbican_tempest_plugin/services/key_manager/json/secret_stores_client.py b/barbican_tempest_plugin/services/key_manager/json/secret_stores_client.py
new file mode 100644
index 0000000..cb5fd5e
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/secret_stores_client.py
@@ -0,0 +1,60 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from urllib import parse
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
+class SecretStoresClient(base.BarbicanTempestClient):
+
+ def list_secret_stores(self, **kwargs):
+ uri = '/v1/secret-stores'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_secret_store(self, secret_store_id):
+ uri = '/v1/secret-stores/{}'.format(secret_store_id)
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_global_secret_store(self, **kwargs):
+ uri = '/v1/secret-stores/global-default'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_preferred_secret_store(self, **kwargs):
+ uri = '/v1/secret-stores/preferred'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def set_preferred_secret_store(self, secret_store_id):
+ uri = '/v1/secret-stores/{}/preferred'.format(secret_store_id)
+ resp, body = self.post(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def unset_preferred_secret_store(self, secret_store_id):
+ uri = '/v1/secret-stores/{}/preferred'.format(secret_store_id)
+ resp, body = self.delete(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
diff --git a/barbican_tempest_plugin/services/key_manager/json/transport_key_client.py b/barbican_tempest_plugin/services/key_manager/json/transport_key_client.py
new file mode 100644
index 0000000..99fa2ea
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/json/transport_key_client.py
@@ -0,0 +1,44 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from urllib import parse
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
+class TransportKeyClient(base.BarbicanTempestClient):
+
+ def list_transport_keys(self, **kwargs):
+ uri = '/v1/transport_keys'
+ if kwargs:
+ uri += '?{}'.format(parse.urlencode(kwargs))
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def create_transport_key(self, **kwargs):
+ uri = '/v1/transport_keys'
+ post_body = json.dumps(kwargs)
+ resp, body = self.post(uri, post_body)
+ self.expected_success(201, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def get_transport_key(self, transport_key_id):
+ uri = '/v1/transport_keys/{}'.format(transport_key_id)
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ return json.loads(body.decode('UTF-8'))
+
+ def delete_transport_key(self, transport_key_id):
+ uri = '/v1/transport_keys/{}'.format(transport_key_id)
+ resp, body = self.delete(uri)
+ self.expected_success(204, resp.status)
diff --git a/barbican_tempest_plugin/services/key_manager/v1_1/__init__.py b/barbican_tempest_plugin/services/key_manager/v1_1/__init__.py
new file mode 100644
index 0000000..1ee6b52
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/v1_1/__init__.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2022 Red Hat Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+from barbican_tempest_plugin.services.key_manager.v1_1.secret_consumer_client \
+ import SecretConsumerClient # noqa: F401
+from barbican_tempest_plugin.services.key_manager.v1_1.version_client \
+ import VersionClient # noqa: F401
+
+__all__ = [
+ 'SecretConsumerClient'
+ 'VersionClient'
+]
diff --git a/barbican_tempest_plugin/services/key_manager/v1_1/secret_consumer_client.py b/barbican_tempest_plugin/services/key_manager/v1_1/secret_consumer_client.py
new file mode 100644
index 0000000..84d7c25
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/v1_1/secret_consumer_client.py
@@ -0,0 +1,53 @@
+# Copyright (c) 2022 Red Hat Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import json
+
+from urllib import parse as urllib
+
+from tempest import config
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
+CONF = config.CONF
+
+
+class SecretConsumerClient(base.BarbicanTempestClient):
+
+ _microversion = '1.1'
+
+ def list_consumers_in_secret(self, secret_id, **kwargs):
+ uri = "/v1/secrets/%s/consumers" % secret_id
+ if kwargs:
+ uri += "?%s" % urllib.urlencode(kwargs)
+
+ response, body = self.get(uri)
+ self.expected_success(200, response.status)
+ return json.loads(body.decode("utf-8"))
+
+ def add_consumer_to_secret(self, secret_id, **kwargs):
+ uri = "/v1/secrets/%s/consumers" % secret_id
+
+ response, body = self.post(uri, json.dumps(kwargs))
+ self.expected_success(200, response.status)
+ return json.loads(body.decode("utf-8"))
+
+ def delete_consumer_from_secret(self, secret_id, **kwargs):
+ uri = "/v1/secrets/%s/consumers" % secret_id
+
+ response, body = self.delete(uri, body=json.dumps(kwargs))
+ self.expected_success(200, response.status)
+ return json.loads(body.decode("utf-8"))
diff --git a/barbican_tempest_plugin/services/key_manager/v1_1/version_client.py b/barbican_tempest_plugin/services/key_manager/v1_1/version_client.py
new file mode 100644
index 0000000..248d4f0
--- /dev/null
+++ b/barbican_tempest_plugin/services/key_manager/v1_1/version_client.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2022 Red Hat Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from tempest import config
+
+from barbican_tempest_plugin.services.key_manager.json import base
+
+
+CONF = config.CONF
+
+
+class VersionClient(base.BarbicanTempestClient):
+ pass
diff --git a/barbican_tempest_plugin/tests/api/base.py b/barbican_tempest_plugin/tests/api/base.py
index 7256a10..566f363 100644
--- a/barbican_tempest_plugin/tests/api/base.py
+++ b/barbican_tempest_plugin/tests/api/base.py
@@ -16,6 +16,7 @@
import functools
from tempest import config
+from tempest.lib.common import api_version_utils
from tempest import test
from barbican_tempest_plugin import clients
@@ -56,7 +57,8 @@
return decorator
-class BaseKeyManagerTest(test.BaseTestCase):
+class BaseKeyManagerTest(test.BaseTestCase,
+ api_version_utils.BaseMicroversionTest):
"""Base class for all api tests."""
# Why do I have to be an admin to create secrets? No idea...
@@ -65,6 +67,15 @@
created_objects = {}
@classmethod
+ def skip_checks(cls):
+ super().skip_checks()
+ api_version_utils.check_skip_with_microversion(
+ cls.min_microversion,
+ cls.max_microversion,
+ CONF.key_manager.min_microversion,
+ CONF.key_manager.max_microversion)
+
+ @classmethod
def setup_clients(cls):
super(BaseKeyManagerTest, cls).setup_clients()
os = getattr(cls, 'os_%s' % cls.credentials[0])
@@ -76,14 +87,21 @@
)
cls.order_client = os.secret_v1.OrderClient(service='key-manager')
cls.secret_client = os.secret_v1.SecretClient(service='key-manager')
+ cls.secret_consumer_client = os.secret_v1_1.SecretConsumerClient()
cls.secret_metadata_client = os.secret_v1.SecretMetadataClient(
service='key-manager'
)
+ cls.version_client = os.secret_v1_1.VersionClient()
os = getattr(cls, 'os_roles_%s' % cls.credentials[1][0])
cls.quota_client = os.secret_v1.QuotaClient(service='key-manager')
@classmethod
+ def setup_credentials(cls):
+ super().setup_credentials()
+ cls.os_primary = getattr(cls, f'os_{cls.credentials[0]}')
+
+ @classmethod
def resource_setup(cls):
super(BaseKeyManagerTest, cls).resource_setup()
for resource in RESOURCE_TYPES:
diff --git a/barbican_tempest_plugin/tests/api/test_orders.py b/barbican_tempest_plugin/tests/api/test_orders.py
index de8791b..0dff2fa 100644
--- a/barbican_tempest_plugin/tests/api/test_orders.py
+++ b/barbican_tempest_plugin/tests/api/test_orders.py
@@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import time
+
from tempest.lib import decorators
from barbican_tempest_plugin.tests.api import base
@@ -56,7 +58,14 @@
self.assertEqual(2, body.get('total'), body)
self.assertEqual(2, len(body.get('orders')), body)
- orders = body.get('orders')
+ # Wait max 60 seconds for orders to finish
+ for _ in range(12):
+ orders = self.order_client.list_orders().get('orders')
+ statuses = [o['status'] for o in orders]
+ if len(set(statuses)) == 1 and statuses[0] == 'ACTIVE':
+ break
+ time.sleep(5)
+
for order in orders:
self.assertIn(
base._get_uuid(order.get('order_ref')),
diff --git a/barbican_tempest_plugin/tests/api/test_quotas.py b/barbican_tempest_plugin/tests/api/test_quotas.py
index cde33e3..2546249 100644
--- a/barbican_tempest_plugin/tests/api/test_quotas.py
+++ b/barbican_tempest_plugin/tests/api/test_quotas.py
@@ -24,7 +24,7 @@
"""Quotas API tests."""
@decorators.idempotent_id('47ebc42b-0e53-4060-b1a1-55bee2c7c43f')
- def test_create_get_delete_quota(self):
+ def test_get_effective_quota(self):
# Verify the default quota settings
body = self.quota_client.get_default_project_quota()
quotas = body.get('quotas')
@@ -34,6 +34,20 @@
self.assertEqual(-1, quotas.get('containers'))
self.assertEqual(-1, quotas.get('consumers'))
+
+class ProjectQuotasTest(base.BaseKeyManagerTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super().skip_checks()
+ if CONF.barbican_rbac_scope_verification.enforce_scope:
+ # These tests can't be run with the new RBAC rules because
+ # the APIs they're testing require system-scoped credentials
+ # instead of the project-scoped credentials used here.
+ raise cls.skipException("enforce_scope is enabled for barbican, "
+ "skipping project quota tests.")
+
+ def test_manage_project_quotas(self):
# Confirm that there are no quotas
body = self.quota_client.list_quotas()
self.assertEqual(0, body.get('total'), body)
diff --git a/barbican_tempest_plugin/tests/api/test_secret_consumers.py b/barbican_tempest_plugin/tests/api/test_secret_consumers.py
new file mode 100644
index 0000000..8288cfd
--- /dev/null
+++ b/barbican_tempest_plugin/tests/api/test_secret_consumers.py
@@ -0,0 +1,101 @@
+# Copyright (c) 2022 Red Hat Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import decorators
+
+from barbican_tempest_plugin.tests.api import base
+
+
+class SecretConsumersTest(base.BaseKeyManagerTest):
+ """Secret Consumers API tests."""
+
+ min_microversion = '1.1'
+
+ @decorators.idempotent_id('07a47f8b-e454-4dd0-afb6-bfa12677cd8e')
+ def test_add_delete_consumers_in_secret(self):
+ # Create a secret to test against
+ sec = self.create_secret(name='secret_1')
+ secret_id = self.secret_consumer_client.ref_to_uuid(sec['secret_ref'])
+
+ # Confirm that the secret has no consumers
+ body = self.secret_consumer_client.list_consumers_in_secret(secret_id)
+ self.assertEqual(0, body.get('total'))
+ self.assertEmpty(body.get('consumers'))
+
+ # Add some consumers to the secret
+ body = self.secret_consumer_client.add_consumer_to_secret(
+ secret_id,
+ service="service1",
+ resource_id="resource_id1",
+ resource_type="resource_type1"
+ )
+ self.assertEqual(
+ secret_id,
+ self.secret_consumer_client.ref_to_uuid(body.get('secret_ref'))
+ )
+ self.assertEqual(1, len(body.get('consumers')))
+ body = self.secret_consumer_client.add_consumer_to_secret(
+ secret_id,
+ service="service2",
+ resource_id="resource_id2",
+ resource_type="resource_type2"
+ )
+ self.assertEqual(
+ secret_id,
+ self.secret_consumer_client.ref_to_uuid(body.get('secret_ref'))
+ )
+ self.assertEqual(2, len(body.get('consumers')))
+
+ # Confirm that the consumers are in the secret
+ body = self.secret_consumer_client.list_consumers_in_secret(secret_id)
+ self.assertEqual(2, body.get('total'))
+ self.assertEqual(2, len(body.get('consumers')))
+ for consumer in body.get('consumers'):
+ self.assertIn(consumer.get('service'), ("service1", "service2"))
+ self.assertIn(consumer.get('resource_id'),
+ ("resource_id1", "resource_id2"))
+ self.assertIn(consumer.get('resource_type'),
+ ("resource_type1", "resource_type2"))
+
+ # Remove the consumers from the secret
+ body = self.secret_consumer_client.delete_consumer_from_secret(
+ secret_id,
+ service="service1",
+ resource_id="resource_id1",
+ resource_type="resource_type1"
+ )
+ self.assertEqual(
+ secret_id,
+ self.secret_consumer_client.ref_to_uuid(body.get('secret_ref'))
+ )
+ self.assertEqual(1, len(body.get('consumers')))
+ body = self.secret_consumer_client.delete_consumer_from_secret(
+ secret_id,
+ service="service2",
+ resource_id="resource_id2",
+ resource_type="resource_type2"
+ )
+ self.assertEqual(
+ secret_id,
+ self.secret_consumer_client.ref_to_uuid(body.get('secret_ref'))
+ )
+ self.assertEqual(0, len(body.get('consumers')))
+
+ # Confirm that the secret has no consumers
+ body = self.secret_consumer_client.list_consumers_in_secret(secret_id)
+ self.assertEqual(0, body.get('total'))
+ self.assertEqual(0, len(body.get('consumers')))
+
+ # Clean up the secret
+ self.delete_secret(secret_id)
diff --git a/barbican_tempest_plugin/tests/rbac/__init__.py b/barbican_tempest_plugin/tests/rbac/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/__init__.py
diff --git a/barbican_tempest_plugin/tests/rbac/v1/__init__.py b/barbican_tempest_plugin/tests/rbac/v1/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/__init__.py
diff --git a/barbican_tempest_plugin/tests/rbac/v1/base.py b/barbican_tempest_plugin/tests/rbac/v1/base.py
new file mode 100644
index 0000000..0e7a774
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/base.py
@@ -0,0 +1,324 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import base64
+from datetime import datetime
+from datetime import timedelta
+import os
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+
+from tempest import clients
+from tempest import config
+from tempest.lib import auth
+from tempest.lib.common import api_version_utils
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+
+CONF = config.CONF
+
+RESOURCE_TYPES = ['container', 'order', 'quota', 'secret']
+
+
+def create_aes_key():
+ password = b"password"
+ salt = os.urandom(16)
+ kdf = PBKDF2HMAC(
+ algorithm=hashes.SHA256(), length=32, salt=salt,
+ iterations=1000, backend=default_backend()
+ )
+ return base64.b64encode(kdf.derive(password))
+
+
+class BarbicanV1RbacBase(test.BaseTestCase,
+ api_version_utils.BaseMicroversionTest):
+
+ identity_version = 'v3'
+ _created_projects = None
+ _created_users = None
+ created_objects = {}
+
+ credentials = [
+ 'system_admin',
+ 'project_alt_member'
+ ]
+
+ # TODO(dmendiza): remove this and use the clients instead
+ @classmethod
+ def ref_to_uuid(cls, href):
+ return href.split('/')[-1]
+
+ @classmethod
+ def skip_checks(cls):
+ super().skip_checks()
+ if not CONF.barbican_rbac_scope_verification.enforce_scope:
+ raise cls.skipException("enforce_scope is not enabled for "
+ "barbican, skipping RBAC tests")
+ api_version_utils.check_skip_with_microversion(
+ cls.min_microversion,
+ cls.max_microversion,
+ CONF.key_manager.min_microversion,
+ CONF.key_manager.max_microversion)
+
+ @classmethod
+ def setup_credentials(cls):
+ super().setup_credentials()
+ cls._created_projects = list()
+ cls._created_users = list()
+ project_id = cls.os_system_admin.projects_client.create_project(
+ data_utils.rand_name()
+ )['project']['id']
+ cls._created_projects.append(project_id)
+ cls.os_project_admin = cls._setup_new_user_client(project_id, 'admin')
+ cls.os_project_member = cls._setup_new_user_client(project_id,
+ 'member')
+ cls.os_project_other_member = cls._setup_new_user_client(project_id,
+ 'member')
+ cls.os_project_reader = cls._setup_new_user_client(project_id,
+ 'reader')
+
+ @classmethod
+ def _setup_new_user_client(cls, project_id, role):
+ """Create a new tempest.clients.Manager
+
+ Creates a new user with the given roles on the given project,
+ and returns an instance of tempest.clients.Manager set up
+ for that user.
+
+ Users are cleaned up during class teardown in cls.clear_credentials
+ """
+ user = {
+ 'name': data_utils.rand_name('user'),
+ 'password': data_utils.rand_password()
+ }
+ user_id = cls.os_system_admin.users_v3_client.create_user(
+ **user
+ )['user']['id']
+ cls._created_users.append(user_id)
+ role_id = cls.os_system_admin.roles_v3_client.list_roles(
+ name=role
+ )['roles'][0]['id']
+ cls.os_system_admin.roles_v3_client.create_user_role_on_project(
+ project_id, user_id, role_id
+ )
+ creds = auth.KeystoneV3Credentials(
+ user_id=user_id,
+ password=user['password'],
+ project_id=project_id
+ )
+ auth_provider = clients.get_auth_provider(creds)
+ creds = auth_provider.fill_credentials()
+ return clients.Manager(credentials=creds)
+
+ @classmethod
+ def clear_credentials(cls):
+ for user_id in cls._created_users:
+ cls.os_system_admin.users_v3_client.delete_user(user_id)
+ for project_id in cls._created_projects:
+ cls.os_system_admin.projects_client.delete_project(project_id)
+ super().clear_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+
+ # setup clients for admin persona
+ adm = cls.os_project_admin
+ cls.admin_secret_client = adm.secret_v1.SecretClient()
+ cls.admin_secret_metadata_client = adm.secret_v1.SecretMetadataClient()
+ cls.admin_consumer_client = adm.secret_v1.ConsumerClient()
+ cls.admin_secret_consumer_client = \
+ adm.secret_v1_1.SecretConsumerClient()
+ cls.admin_container_client = adm.secret_v1.ContainerClient()
+ cls.admin_order_client = adm.secret_v1.OrderClient(
+ secret_client=cls.admin_secret_client,
+ container_client=cls.admin_container_client
+ )
+ cls.admin_quota_client = adm.secret_v1.QuotaClient()
+
+ # set clients for member persona
+ member = cls.os_project_member
+ cls.secret_client = member.secret_v1.SecretClient()
+ cls.secret_metadata_client = member.secret_v1.SecretMetadataClient()
+ cls.member_consumer_client = member.secret_v1.ConsumerClient()
+ cls.member_secret_consumer_client = \
+ member.secret_v1_1.SecretConsumerClient()
+ cls.container_client = member.secret_v1.ContainerClient()
+ cls.order_client = member.secret_v1.OrderClient(
+ secret_client=cls.secret_client,
+ container_client=cls.container_client
+ )
+ cls.quota_client = member.secret_v1.QuotaClient()
+
+ # set up clients for member persona associated with a different
+ # project
+ cls.other_secret_client = \
+ cls.os_project_alt_member.secret_v1.SecretClient()
+ cls.other_secret_metadata_client = \
+ cls.os_project_alt_member.secret_v1.SecretMetadataClient()
+ cls.other_container_client = \
+ cls.os_project_alt_member.secret_v1.ContainerClient()
+ cls.other_order_client = \
+ cls.os_project_alt_member.secret_v1.OrderClient(
+ secret_client=cls.other_secret_client,
+ container_client=cls.other_container_client
+ )
+
+ @classmethod
+ def resource_setup(cls):
+ super().resource_setup()
+ for resource in RESOURCE_TYPES:
+ cls.created_objects[resource] = set()
+
+ @classmethod
+ def resource_cleanup(cls):
+ try:
+ for container_uuid in list(cls.created_objects['container']):
+ cls.admin_container_client.delete_container(container_uuid)
+ cls.created_objects['container'].remove(container_uuid)
+ for quota_uuid in list(cls.created_objects['quota']):
+ cls.admin_quota_client.delete_project_quota(quota_uuid)
+ cls.created_objects['quota'].remove(quota_uuid)
+ for secret_uuid in list(cls.created_objects['secret']):
+ cls.admin_secret_client.delete_secret(secret_uuid)
+ cls.created_objects['secret'].remove(secret_uuid)
+
+ for client in [cls.secret_client,
+ cls.order_client,
+ cls.admin_secret_client,
+ cls.admin_order_client,
+ cls.other_secret_client,
+ cls.other_order_client]:
+ client.cleanup()
+ finally:
+ super(BarbicanV1RbacBase, cls).resource_cleanup()
+
+ @classmethod
+ def add_cleanup(cls, resource, response):
+ if resource == 'container':
+ uuid = cls.ref_to_uuid(response['container_ref'])
+ if resource == 'quota':
+ uuid = cls.ref_to_uuid(response['quota_ref'])
+ if resource == 'secret':
+ uuid = cls.ref_to_uuid(response['secret_ref'])
+ cls.created_objects[resource].add(uuid)
+
+ @classmethod
+ def delete_cleanup(cls, resource, uuid):
+ cls.created_objects[resource].remove(uuid)
+
+ # TODO(dmendiza): get rid of this helper method.
+ def do_request(self, method, client=None, expected_status=200,
+ cleanup=None, **args):
+ if client is None:
+ client = self.client
+ if isinstance(expected_status, type(Exception)):
+ self.assertRaises(expected_status,
+ getattr(client, method),
+ **args)
+ else:
+ response = getattr(client, method)(**args)
+ # self.assertEqual(response.response.status, expected_status)
+ if cleanup is not None:
+ self.add_cleanup(cleanup, response)
+ return response
+
+ def create_empty_secret_admin(self, secret_name):
+ """add empty secret as admin user """
+ return self.admin_secret_client.create_secret(name=secret_name)
+
+ def create_empty_container_admin(self,
+ container_name,
+ container_type='generic'):
+ """add empty container as admin user"""
+ return self.admin_container_client.create_container(
+ name=container_name,
+ type=container_type)
+
+ def create_aes_secret_admin(self, secret_name):
+ key = create_aes_key()
+ expire_time = (datetime.utcnow() + timedelta(days=5))
+ return key, self.do_request(
+ 'create_secret', client=self.admin_secret_client,
+ expected_status=201, cleanup="secret",
+ expiration=expire_time.isoformat(), algorithm="aes",
+ bit_length=256, mode="cbc", payload=key,
+ payload_content_type="application/octet-stream",
+ payload_content_encoding="base64",
+ name=secret_name
+ )
+
+ def create_other_project_secret(self, secret_name, payload=None):
+ kwargs = {
+ 'name': secret_name,
+ 'secret_type': 'passphrase',
+ }
+ if payload is not None:
+ kwargs['payload'] = payload
+ kwargs['payload_content_type'] = 'text/plain'
+ resp = self.other_secret_client.create_secret(**kwargs)
+ return self.other_secret_client.ref_to_uuid(resp['secret_ref'])
+
+ def create_test_secret(self, client, name, payload=None):
+ """Create a secret for testing
+
+ The new secret is created using the given client. If no
+ payload is given, the secret is left empty.
+
+ :returns: the uuid for the new secret
+ """
+ kwargs = {
+ 'name': name,
+ 'secret_type': 'passphrase'
+ }
+ if payload is not None:
+ kwargs['payload'] = payload
+ kwargs['payload_content_type'] = 'text/plain'
+ resp = client.create_secret(**kwargs)
+ return client.ref_to_uuid(resp['secret_ref'])
+
+ def create_test_order(self, client, order_name):
+ """Create a symmetric key order for testing
+
+ The new order is created using the given
+ client.
+
+ :returns: the uuid for the new order
+ """
+ kwargs = {
+ 'type': 'key',
+ 'meta': {
+ 'name': order_name,
+ 'algorithm': 'AES',
+ 'bit_length': 256,
+ 'mode': 'CBC',
+ }
+ }
+ resp = client.create_order(**kwargs)
+ return client.ref_to_uuid(resp['order_ref'])
+
+ def create_test_container(self, client, name):
+ """Create a generic container for testing
+
+ The new container is created using the given client.
+
+ :returns: the uuid for the new container
+ """
+ container = {
+ "type": "generic",
+ "name": name,
+ }
+ resp = client.create_container(**container)
+ return client.ref_to_uuid(resp['container_ref'])
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_containers.py b/barbican_tempest_plugin/tests/rbac/v1/test_containers.py
new file mode 100644
index 0000000..16d743f
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_containers.py
@@ -0,0 +1,467 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import abc
+
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacContainers:
+
+ @abc.abstractmethod
+ def test_list_containers(self):
+ """Test list_containers policy
+
+ Testing: GET /v1/containers
+ This test must check:
+ * whether the persona can list containers
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_container(self):
+ """Test create_container policy
+
+ Testing: POST /v1/containers
+ This test must check:
+ * whether the persona can create a new container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_container(self):
+ """Test get_container policy
+
+ Testing: GET /v1/containers/{container-id}
+ This test must check:
+ * whether the persona can get a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_container(self):
+ """Test delete_container policy
+
+ Testing: DELETE /v1/containers/{container-id}
+ This test must check:
+ * whether the persona can delete a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_container_acl(self):
+ """Test GET /v1/containers/{container-id}/acl
+
+ This test must check:
+ * whether the persona can get a containers acl
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_update_container_acl(self):
+ """Test PATCH /v1/containers/{container-id}/acl
+
+ This test must check:
+ * whether the persona can update an existing containers acl
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_container_acl(self):
+ """Test PUT /v1/containers/{container-id}/acl
+
+ This test must check:
+ * whether the persona can create a containers acl
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_container_acl(self):
+ """Test DELETE /v1/containers/{container-id}/acl
+
+ This test must check:
+ * whether the persona can delete a containers acl
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_list_container_consumers(self):
+ """Test list_container_consumers policy
+
+ Testing: GET /v1/containers/{container-id}/consumers
+ This test must check:
+ * whether the persona can list a containers consumers
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_container_consumer(self):
+ """Test create_container_consumer policy
+
+ Testing: POST /v1/containers/{container-id}/consumers
+ This test must check:
+ * whether the persona can create a consumer of the container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_container_consumer(self):
+ """Test delete_container_consumer policy
+
+ Testing: DELETE /v1/containers/{container-id}/consumers
+ This test must check:
+ * whether the persona can delete a consumer of the container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_container_consumer(self):
+ """Test GET /v1/containers/{container-id}/consumers/{consumer-id}
+
+ This test must check:
+ * whether the persona can get a containers consumer by id
+
+ NOTE: This route is undocumented, also there's no way to get a
+ consumer-id back from the API.
+ """
+ pass
+
+ @abc.abstractmethod
+ def test_add_secret_to_container(self):
+ """Test add_secret_to_container policy
+
+ Testing: POST /v1/containers/{container-id}/secrets
+ This test must check:
+ * whether the persona can add a secret to a container
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret_from_container(self):
+ """Test delete_secret_from_container policy
+
+ Testing: DELETE /v1/containers/{container-id}/secrets
+ This test must check:
+ * whether the persona can delete a secret from a container
+ """
+ raise NotImplementedError
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacContainers):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.ContainerClient()
+ cls.consumer_client = cls.os_project_reader.secret_v1.ConsumerClient()
+
+ def setUp(self):
+ super().setUp()
+ self.secret_id = self.create_test_secret(
+ self.secret_client,
+ data_utils.rand_name('test-containers'),
+ 'SECRET_PASSPHRASE'
+ )
+ self.container_id = self.create_test_container(
+ self.container_client,
+ data_utils.rand_name('test-containers'))
+ self.valid_acl = {
+ 'read': {
+ 'users': [self.other_secret_client.user_id],
+ 'project-access': True
+ }
+ }
+ self.test_consumer = {
+ "name": "test-consumer",
+ "URL": "https://example.test/consumer"
+ }
+ self.member_consumer_client.add_consumer_to_container(
+ self.container_id,
+ **self.test_consumer
+ )
+
+ def test_list_containers(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.list_containers)
+
+ def test_create_container(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_container)
+
+ def test_get_container(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_container,
+ container_id=self.container_id)
+
+ def test_delete_container(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_container,
+ container_id=self.container_id)
+
+ def test_get_container_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_container_acl,
+ self.container_id)
+
+ def test_update_container_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.patch_container_acl,
+ self.container_id,
+ self.valid_acl)
+
+ def test_create_container_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_container_acl,
+ self.container_id,
+ self.valid_acl)
+
+ def test_delete_container_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_container,
+ self.container_id)
+
+ def test_list_container_consumers(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.consumer_client.list_consumers_in_container,
+ self.container_id)
+
+ def test_create_container_consumer(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.consumer_client.add_consumer_to_container,
+ self.container_id,
+ **self.test_consumer)
+
+ def test_delete_container_consumer(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.consumer_client.delete_consumer_from_container,
+ self.container_id,
+ **self.test_consumer)
+
+ def test_add_secret_to_container(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.add_secret_to_container,
+ container_id=self.container_id,
+ secret_id=self.secret_id)
+
+ def test_delete_secret_from_container(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret_from_container,
+ container_id=self.container_id,
+ secret_id=self.secret_id)
+
+
+class ProjectMemberTests(ProjectReaderTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.container_client
+ cls.consumer_client = cls.member_consumer_client
+
+ def test_list_containers(self):
+ resp = self.client.list_containers()
+ containers = resp['containers']
+
+ self.assertGreaterEqual(len(containers), 1)
+
+ def test_create_container(self):
+ container_id = self.create_test_container(
+ self.client,
+ 'test-create-container')
+
+ _ = self.container_client.get_container(container_id)
+
+ def test_get_container(self):
+ resp = self.client.get_container(self.container_id)
+
+ self.assertEqual(
+ self.container_id,
+ self.client.ref_to_uuid(resp['container_ref']))
+
+ def test_delete_container(self):
+ self.client.delete_container(self.container_id)
+
+ resp = self.container_client.list_containers()
+ container_ids = [self.client.ref_to_uuid(c['container_ref'])
+ for c in resp['containers']]
+ self.assertNotIn(self.container_id, container_ids)
+
+ def test_add_secret_to_container(self):
+ self.client.add_secret_to_container(
+ container_id=self.container_id,
+ secret_id=self.secret_id)
+
+ resp = self.client.get_container(self.container_id)
+ secret_ids = [self.client.ref_to_uuid(sr['secret_ref'])
+ for sr in resp['secret_refs']]
+ self.assertIn(self.secret_id, secret_ids)
+
+ def test_delete_secret_from_container(self):
+ self.client.add_secret_to_container(
+ self.container_id,
+ self.secret_id)
+ resp = self.client.get_container(self.container_id)
+ secret_ids = [self.client.ref_to_uuid(sr['secret_ref'])
+ for sr in resp['secret_refs']]
+ self.assertIn(self.secret_id, secret_ids)
+
+ self.client.delete_secret_from_container(
+ self.container_id,
+ self.secret_id)
+
+ resp = self.client.get_container(self.container_id)
+ secret_ids = [self.client.ref_to_uuid(sr['secret_ref'])
+ for sr in resp['secret_refs']]
+ self.assertNotIn(self.secret_id, secret_ids)
+
+ def test_get_container_acl(self):
+ resp = self.client.get_container_acl(self.container_id)
+ self.assertIn('read', resp.keys())
+
+ def test_create_container_acl(self):
+ _ = self.client.put_container_acl(self.container_id, self.valid_acl)
+
+ acl = self.client.get_container_acl(self.container_id)
+ self.assertIn(self.other_secret_client.user_id, acl['read']['users'])
+
+ def test_update_container_acl(self):
+ _ = self.client.put_container_acl(self.container_id, self.valid_acl)
+ acl = self.client.get_container_acl(self.container_id)
+ self.assertIn(self.other_secret_client.user_id, acl['read']['users'])
+ clear_users_acl = {
+ 'read': {
+ 'users': []
+ }
+ }
+
+ _ = self.client.patch_container_acl(self.container_id, clear_users_acl)
+
+ acl = self.client.get_container_acl(self.container_id)
+ self.assertNotIn(self.other_secret_client.user_id,
+ acl['read']['users'])
+
+ def test_delete_container_acl(self):
+ _ = self.client.put_container_acl(self.container_id, self.valid_acl)
+ acl = self.client.get_container_acl(self.container_id)
+ self.assertIn(self.other_secret_client.user_id, acl['read']['users'])
+
+ _ = self.client.delete_container_acl(self.container_id)
+
+ acl = self.client.get_container_acl(self.container_id)
+ self.assertNotIn('users', acl['read'].keys())
+
+ def test_list_container_consumers(self):
+ resp = self.consumer_client.list_consumers_in_container(
+ self.container_id
+ )
+ self.assertEqual(1, resp['total'])
+
+ def test_create_container_consumer(self):
+ second_consumer = {
+ 'name': 'another-test-consumer',
+ 'URL': 'https://exlample.test/consumer/two'
+ }
+
+ resp = self.consumer_client.add_consumer_to_container(
+ self.container_id,
+ **second_consumer)
+
+ self.assertEqual(2, len(resp['consumers']))
+
+ def test_delete_container_consumer(self):
+ resp = self.consumer_client.delete_consumer_from_container(
+ self.container_id,
+ **self.test_consumer)
+
+ self.assertEqual(0, len(resp['consumers']))
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.admin_container_client
+ cls.consumer_client = cls.admin_consumer_client
+
+
+class ProjectReaderTestsAcrossProjects(ProjectReaderTests):
+ """Tests for Project Reader across Projects
+
+ Tests for Project Reader Persona using containers/secrets
+ that belong to a different project.
+
+ This class overrides setUp to create self.secret_id and
+ self.container_id to use objects that belong to a different
+ project.
+
+ We re-use most of the tests in ProjectReaderTests because
+ we also expect these to be Forbidden.
+
+ The only exception is the two tests we've overridden to
+ pass because it is not possible to list or create containers
+ on a different project.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.secret_id = self.create_test_secret(
+ self.other_secret_client,
+ data_utils.rand_name('test-containers'),
+ 'SECRET_PASSPHRASE'
+ )
+ self.container_id = self.create_test_container(
+ self.other_container_client,
+ data_utils.rand_name('test-containers'))
+
+ def test_list_containers(self):
+ """This is not possible across projects"""
+ pass
+
+ def test_create_container(self):
+ """This is not possible across projects"""
+ pass
+
+
+class ProjectMemberTestsAcrossProjects(ProjectReaderTestsAcrossProjects):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.container_client
+
+
+class ProjectAdminTestsAcrossProjects(ProjectMemberTestsAcrossProjects):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.admin_container_client
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_orders.py b/barbican_tempest_plugin/tests/rbac/v1/test_orders.py
new file mode 100644
index 0000000..964d95d
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_orders.py
@@ -0,0 +1,168 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacOrders:
+
+ @abc.abstractmethod
+ def test_list_orders(self):
+ """Test list_orders policy
+
+ Testing GET /v1/orders
+ This test must check:
+ * whether persona can list orders
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_order(self):
+ """Test create_order policy
+
+ Testing POST /v1/orders
+ This test must check:
+ * whether persona can create orders
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_order(self):
+ """Test get_order policy
+
+ Testing GET /v1/orders/{order-id}
+ This test must check:
+ * whether persona can get order information
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_order(self):
+ """Test delete_order policy
+
+ Testing DELETE /v1/orders/{order-id}
+ This test must check:
+ * whether persona can delete orders
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_other_project_order(self):
+ """Test get_order policy
+
+ Testing GET /v1/orders/{order-id}
+ This test must check:
+ * whether persona can get order information
+ for an order that belongs to a different
+ project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_other_project_order(self):
+ """Test delete_order policy
+
+ Testing DELETE /v1/orders/{order-id}
+ This test must check:
+ * whether persona can delete orders
+ that belong to a different project
+ """
+ raise NotImplementedError
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacOrders):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.OrderClient()
+
+ def test_list_orders(self):
+ self.assertRaises(exceptions.Forbidden, self.client.list_orders)
+
+ def test_create_order(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.create_test_order,
+ self.client,
+ 'create_orders_s'
+ )
+
+ def test_get_order(self):
+ order_id = self.create_test_order(self.order_client, 'test_get_order')
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_order,
+ order_id=order_id)
+
+ def test_delete_order(self):
+ order_id = self.create_test_order(self.order_client,
+ 'test_delete_order')
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_order,
+ order_id=order_id)
+
+ def test_get_other_project_order(self):
+ order_id = self.create_test_order(
+ self.other_order_client,
+ 'test_get_other_project_order')
+ self.assertRaises(
+ exceptions.NotFound,
+ self.client.get_order,
+ order_id)
+
+ def test_delete_other_project_order(self):
+ order_id = self.create_test_order(
+ self.other_order_client,
+ 'test_delete_other_project_order')
+ self.assertRaises(
+ exceptions.NotFound,
+ self.client.delete_order,
+ order_id)
+
+
+class ProjectMemberTests(ProjectReaderTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.OrderClient()
+
+ def test_list_orders(self):
+ _ = self.create_test_order(self.order_client, 'test_list_orders')
+ resp = self.client.list_orders()
+ self.assertGreaterEqual(len(resp['orders']), 1)
+
+ def test_create_order(self):
+ self.create_test_order(self.client, 'create_orders_s')
+
+ def test_get_order(self):
+ order_id = self.create_test_order(self.order_client, 'test_get_order')
+ resp = self.client.get_order(order_id)
+ self.assertEqual(order_id, self.client.ref_to_uuid(resp['order_ref']))
+
+ def test_delete_order(self):
+ order_id = self.create_test_order(self.order_client,
+ 'test_delete_order')
+ self.client.delete_order(order_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.OrderClient()
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_quotas.py b/barbican_tempest_plugin/tests/rbac/v1/test_quotas.py
new file mode 100644
index 0000000..16edc18
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_quotas.py
@@ -0,0 +1,195 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacQuota:
+
+ @abc.abstractmethod
+ def test_get_effective_project_quota(self):
+ """Test getting the effective quota information
+
+ Testing: GET /v1/quotas
+ This test must check:
+ * whether the persona can retrieve the effective quota for
+ their project.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_list_project_quotas(self):
+ """Test listing all configured project quotas
+
+ Testing: GET /v1/project-quotas
+ This test must check:
+ * whether the persona can retrieve all modified quotas for
+ the entire system.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_custom_quota_for_project(self):
+ """Test getting a custom quota for the persona's project
+
+ Testing: GET /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can retrieve the custom quota for
+ the project in the persona's credentials.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_set_new_quota_for_project(self):
+ """Test setting a custom quota for the persona's project
+
+ Testing: PUT /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can create custom quotas for
+ the project in the persona's credentials.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_remove_custom_quota_for_project(self):
+ """Test removing a custom quota for the persona's project
+
+ Testing: DELETE /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can delete custom quotas for
+ the project in the persona's credentials.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_custom_quota_for_other_project(self):
+ """Test getting a custom quota for a different project
+
+ Testing: GET /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can retrieve the custom quota for
+ a project that is different than the project in the
+ persona's credentials.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_set_new_quota_for_other_project(self):
+ """Test setting a custom quota for a different project
+
+ Testing: PUT /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can create custom quotas for a
+ project that is different than the project in the
+ persona's credentials.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_remove_custom_quota_for_other_project(self):
+ """Test removing a custom quota for a different project
+
+ Testing: DELETE /v1/project-quotas/{project-id}
+ This test must check:
+ * whether the persona can delete custom quotas for a
+ project that is different than the project in the
+ persona's credentials.
+ """
+ raise NotImplementedError
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase, BarbicanV1RbacQuota):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.QuotaClient()
+
+ def test_get_effective_project_quota(self):
+ resp = self.client.get_default_project_quota()
+ self.assertIn('quotas', resp)
+
+ def test_list_project_quotas(self):
+ self.assertRaises(exceptions.Forbidden, self.client.list_quotas)
+
+ def test_get_custom_quota_for_project(self):
+ project_id = self.client.tenant_id
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_project_quota,
+ project_id)
+
+ def test_set_new_quota_for_project(self):
+ project_id = self.client.tenant_id
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_project_quota,
+ project_id,
+ project_quotas={
+ "secrets": 1000,
+ "orders": 1000,
+ "containers": 1000
+ }
+ )
+
+ def test_remove_custom_quota_for_project(self):
+ project_id = self.client.tenant_id
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_project_quota,
+ project_id)
+
+ def test_get_custom_quota_for_other_project(self):
+ project_id = self.other_secret_client.tenant_id
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_project_quota,
+ project_id)
+
+ def test_set_new_quota_for_other_project(self):
+ project_id = self.other_secret_client.tenant_id
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_project_quota,
+ project_id,
+ project_quotas={
+ "secrets": 1000,
+ "orders": 1000,
+ "containers": 1000
+ }
+ )
+
+ def test_remove_custom_quota_for_other_project(self):
+ project_id = self.other_secret_client.tenant_id
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_project_quota,
+ project_id)
+
+
+class ProjectMemberTests(ProjectReaderTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.QuotaClient()
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.QuotaClient()
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secret_metadata.py b/barbican_tempest_plugin/tests/rbac/v1/test_secret_metadata.py
new file mode 100644
index 0000000..a8e61a7
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secret_metadata.py
@@ -0,0 +1,336 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacSecretMetadata:
+
+ @abc.abstractmethod
+ def test_create_key_value_pair(self):
+ """Test create_key_value_pair policy
+
+ Testing: POST /v1/secrets/{secret-id}/metadata
+ This test must check:
+ * whether the persona can add metadata to a secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_secret_metadata(self):
+ """Test put_secret_metadata policy
+
+ Testing: PUT /v1/secrets/{secret-id}/metadata
+ This test must check:
+ * whether the persona can update metadata on a secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_metadata(self):
+ """Test get_secret_metadata policy
+
+ Testing: GET /v1/secrets/{secret-id}/metadata
+ This test must check:
+ * whether the persona can retrieve secret metadata
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_update_secret_metadata_by_key(self):
+ """Test update_secret_metadata policy
+
+ Testing: PUT /v1/secrets/{secret-id}/metadata/{meta-key}
+ This test must check:
+ * whether the persona can update individual secret metadata
+ values
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_metadata_by_key(self):
+ """Test get_secret_metadata_by_key policy
+
+ Testing: GET /v1/secrets/{secret-id}/metadata/{meta-key}
+ This test must check:
+ * whether the persona can retrieve individual secret metadata
+ values
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret_metadata_by_key(self):
+ """Test delete_secret_metadata_by_key policy
+
+ Testing: DELETE /v1/secrets/{secret-id}/metadata/{meta-key}
+ This test must check:
+ * whether the persona can delete individual secret metadata
+ values
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_key_value_pair_on_other_secret(self):
+ """Test create_key_value_pair policy
+
+ Testing: POST /v1/secrets/{secret-id}/metadata
+ This test must check:
+ * whether the persona can add metadata to a secret
+ that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_secret_metadata_on_other_secret(self):
+ """Test put_secret_metadata policy
+
+ Testing: PUT /v1/secrets/{secret-id}/metadata
+ This test must check:
+ * whether the persona can update metadata on a secret
+ that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_metadata_from_other_secret(self):
+ """Test get_secret_metadata policy
+
+ Testing: GET /v1/secrets/{secret-id}/metadata
+ This test must check:
+ * whether the persona can retrieve secret metadata
+ from a secret that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_update_other_secret_metadata_by_key(self):
+ """Test update_secret_metadata policy
+
+ Testing: PUT /v1/secrets/{secret-id}/metadata/{meta-key}
+ This test must check:
+ * whether the persona can update individual secret metadata
+ values on a secret that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_other_secret_metadata_by_key(self):
+ """Test get_secret_metadata_by_key policy
+
+ Testing: GET /v1/secrets/{secret-id}/metadata/{meta-key}
+ This test must check:
+ * whether the persona can retrieve individual secret metadata
+ values for a secret that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_other_secret_metadata_by_key(self):
+ """Test delete_secret_metadata policy
+
+ Testing: DELETE /v1/secrets/{secret-id}/metadata/{meta-key}
+ This test must check:
+ * whether the persona can delete individual secret metadata
+ values
+ """
+ raise NotImplementedError
+
+
+class ProjectReaderTests(base.BarbicanV1RbacBase,
+ BarbicanV1RbacSecretMetadata):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.SecretMetadataClient()
+
+ def setUp(self):
+ super().setUp()
+ self.secret_id = self.create_test_secret(
+ self.secret_client,
+ data_utils.rand_name('test-secret-metadata'),
+ 'SECRET_PASSPHRASE')
+ self.secret_metadata_client.create_key_value_pair(
+ self.secret_id,
+ 'foo',
+ 'bar')
+
+ self.other_secret_id = self.create_test_secret(
+ self.other_secret_client,
+ data_utils.rand_name('test-secret-metadata'),
+ 'SECRET_PASSPHRASE')
+ self.other_secret_metadata_client.create_key_value_pair(
+ self.other_secret_id,
+ 'foo',
+ 'bar')
+
+ def test_create_key_value_pair(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_key_value_pair,
+ self.secret_id,
+ 'mykey',
+ 'foo'
+ )
+
+ def test_put_secret_metadata(self):
+ meta = {
+ 'foo': 'bar',
+ 'baz': 'bork'
+ }
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_secret_metadata,
+ self.secret_id,
+ **meta)
+
+ def test_get_secret_metadata(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_metadata,
+ self.secret_id)
+
+ def test_update_secret_metadata_by_key(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.update_secret_metadata,
+ self.secret_id,
+ 'foo',
+ 'baz')
+
+ def test_get_secret_metadata_by_key(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_metadata_by_key,
+ self.secret_id,
+ 'foo')
+
+ def test_delete_secret_metadata_by_key(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret_metadata_by_key,
+ self.secret_id,
+ 'foo')
+
+ def test_create_key_value_pair_on_other_secret(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_key_value_pair,
+ self.other_secret_id,
+ 'mykey',
+ 'foo'
+ )
+
+ def test_put_secret_metadata_on_other_secret(self):
+ meta = {
+ 'foo': 'bar',
+ 'baz': 'bork'
+ }
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_secret_metadata,
+ self.other_secret_id,
+ **meta)
+
+ def test_get_secret_metadata_from_other_secret(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_metadata,
+ self.other_secret_id)
+
+ def test_update_other_secret_metadata_by_key(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.update_secret_metadata,
+ self.other_secret_id,
+ 'foo',
+ 'baz')
+
+ def test_get_other_secret_metadata_by_key(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_metadata_by_key,
+ self.other_secret_id,
+ 'foo')
+
+ def test_delete_other_secret_metadata_by_key(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret_metadata_by_key,
+ self.other_secret_id,
+ 'foo')
+
+
+class ProjectMemberTests(ProjectReaderTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_metadata_client
+
+ def test_create_key_value_pair(self):
+ resp = self.client.create_key_value_pair(
+ self.secret_id,
+ 'mykey',
+ 'foo'
+ )
+ self.assertEqual('mykey', resp['key'])
+ self.assertEqual('foo', resp['value'])
+
+ def test_put_secret_metadata(self):
+ test_meta = {
+ 'foo': 'baz',
+ 'bar': 'bork'
+ }
+ self.client.put_secret_metadata(self.secret_id, **test_meta)
+ resp = self.client.get_secret_metadata(self.secret_id)
+
+ self.assertIn('bar', resp.keys())
+ self.assertEqual('baz', resp['foo'])
+
+ def test_get_secret_metadata(self):
+ resp = self.client.get_secret_metadata(self.secret_id)
+
+ self.assertIn('foo', resp.keys())
+ self.assertEqual('bar', resp['foo'])
+
+ def test_update_secret_metadata_by_key(self):
+ self.client.update_secret_metadata(self.secret_id, 'foo', 'baz')
+
+ resp = self.secret_metadata_client.get_secret_metadata(self.secret_id)
+ self.assertEqual('baz', resp['foo'])
+
+ def test_get_secret_metadata_by_key(self):
+ resp = self.client.get_secret_metadata_by_key(self.secret_id, 'foo')
+ self.assertEqual('foo', resp['key'])
+ self.assertEqual('bar', resp['value'])
+
+ def test_delete_secret_metadata_by_key(self):
+ self.client.delete_secret_metadata_by_key(self.secret_id, 'foo')
+ self.assertRaises(
+ exceptions.NotFound,
+ self.client.get_secret_metadata_by_key,
+ self.secret_id,
+ 'foo')
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.admin_secret_metadata_client
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secret_stores.py b/barbican_tempest_plugin/tests/rbac/v1/test_secret_stores.py
new file mode 100644
index 0000000..6f0a00d
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secret_stores.py
@@ -0,0 +1,187 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest import config
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacSecretStores:
+
+ @abc.abstractmethod
+ def test_list_secret_stores(self):
+ """Test getting a list of all backends
+
+ Testing: GET /v1/secret-stores
+ This test must check:
+ * whether the persona can list all secret stores
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_store(self):
+ """Test get secret store information
+
+ Testing: GET /v1/secret-stores/{secret-store-id}
+ This test must check:
+ * whether the persona can get information about a specific
+ secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_global_secret_store(self):
+ """Test getting the global secret store
+
+ Testing: GET /v1/secret-stores/global-default
+ This test must check:
+ * whether the persona can get information about the global
+ default secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_preferred_secret_store(self):
+ """Test getting the preferred secret store
+
+ Testing: GET /v1/secret-stores/preferred
+ This test must check:
+ * whether the persona can get information about their project's
+ preferred secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_set_preferred_secret_store(self):
+ """Test setting the preferred secret store
+
+ Testing: POST /v1/secret-stores/{secret-store-id}/preferred
+ This test must check:
+ * whether the persona can set their project's preferred
+ secret store
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_unset_preferred_secret_store(self):
+ """Test removing the preferred secret store
+
+ Testing: DELETE /v1/secret-stores/{secret-store-id}/preferred
+ This test must check:
+ * whether the persona can set their project's preferred
+ secret store
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacSecretStores):
+
+ @classmethod
+ def skip_checks(cls):
+ """TODO(redrobot): Run this with multiple backends
+
+ We need to set up the devstack plugin to use multiple backends
+ so we can run these tests.
+ """
+ if not CONF.barbican_tempest.enable_multiple_secret_stores:
+ raise cls.skipException("enable_multiple_secret_stores is not "
+ "configured. Skipping RBAC tests.")
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.SecretStoresClient()
+
+ def test_list_secret_stores(self):
+ resp = self.do_request('list_secret_stores')
+ self.assertIn('secret_stores', resp)
+
+ def test_get_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ resp = self.do_request('get_secret_store',
+ secret_store_id=secret_store_id)
+ self.assertEqual(secret_store_id,
+ self.ref_to_uuid(resp['secret_store_ref']))
+
+ def test_get_global_secret_store(self):
+ resp = self.do_request('get_global_secret_store')
+ self.assertTrue(resp['global_default'])
+
+ def test_get_preferred_secret_store(self):
+ resp = self.do_request('get_preferred_secret_store')
+ self.assertEqual('ACTIVE', resp['status'])
+
+ def test_set_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('set_preferred_secret_store',
+ expected_status=exceptions.Forbidden,
+ secret_store_id=secret_store_id)
+
+ def test_unset_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('unset_peferred_secret_store',
+ expected_status=exceptions.Forbidden,
+ secret_store_id=secret_store_id)
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.SecretStoresClient()
+
+ def test_set_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('set_preferred_secret_store',
+ secret_store_id=secret_store_id)
+ resp = self.do_request('get_preferred_secret_store')
+ self.assertEqual(secret_store_id,
+ self.ref_to_uuid(resp['secret_store_ref']))
+
+ def test_unset_preferred_secret_store(self):
+ resp = self.do_request('list_secret_stores')
+ secret_store_id = self.ref_to_uuid(
+ resp['secret_stores'][0]['secret_store_ref']
+ )
+ self.do_request('set_preferred_secret_store',
+ secret_store_id=secret_store_id)
+ self.do_request('unset_peferred_secret_store',
+ secret_store_id=secret_store_id)
+ resp = self.do_request('get_preferred_secret_store')
+ self.assertEqual(secret_store_id,
+ self.ref_to_uuid(resp['secret_store_ref']))
+
+
+class ProjectReaderTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.SecretStoresClient()
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
new file mode 100644
index 0000000..bdd56b2
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_secrets.py
@@ -0,0 +1,741 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import base64
+from datetime import datetime
+from datetime import timedelta
+
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base as rbac_base
+
+
+CONF = config.CONF
+
+
+class BarbicanV1RbacSecrets:
+
+ @abc.abstractmethod
+ def test_create_secret(self):
+ """Test add_secret policy.
+
+ Testing: POST /v1/secrets
+ This test must check:
+ * whether the persona can create an empty secret
+ * whether the persona can create a secret with a symmetric key
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_list_secrets(self):
+ """Test get_secrets policy.
+
+ Testing: GET /v1/secrets
+ This test must check:
+ * whether the persona can list secrets within their project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret(self):
+ """Test deleting a secret.
+
+ Testing: DEL /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can delete a secret in their project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret(self):
+ """Test get_secret policy.
+
+ Testing: GET /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can get a specific secret within their project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_payload(self):
+ """Test get_secret payload policy.
+
+ Testing: GET /v1/secrets/{secret_id}/payload
+ This test must check:
+ * whether the persona can get a secret payload
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_secret_payload(self):
+ """Test put_secret policy.
+
+ Testing: PUT /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can add a paylod to an empty secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_other_project_secret(self):
+ """Test get_secrets policy
+
+ Testing: GET /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can get secret metadata for a secret that
+ belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_other_project_secret_payload(self):
+ """Test get_secrets policy
+
+ Testing: GET /v1/secrets/{secret_id}/payload
+ This test must check:
+ * whether the persona can get secret payload for a secret that
+ belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_other_project_secret_payload(self):
+ """Test put_secret policy.
+
+ Testing: PUT /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can PUT the secret payload in a 2-step
+ create when the first step is done by a member of a different
+ project.
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_other_project_secret(self):
+ """Test delete_secret policy.
+
+ Testing: DELETE /v1/secrets/{secret_id}
+ This test must check:
+ * whether the persona can delete a secret that belongs to a
+ different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_secret_acl(self):
+ """Test GET /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can get the ACL for a secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_secret_acl(self):
+ """Test PUT /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can overwrite the ACL for a secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_patch_secret_acl(self):
+ """Test PATCH /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can modify the ACL for a secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret_acl(self):
+ """Test DELETE /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can delete the ACL for a secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_other_secret_acl(self):
+ """Test GET /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can get the ACL for a secret
+ that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_put_other_secret_acl(self):
+ """Test PUT /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can overwrite the ACL for a secret
+ that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_patch_other_secret_acl(self):
+ """Test PATCH /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can modify the ACL for a secret
+ that belongs to a different project
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_other_secret_acl(self):
+ """Test DELETE /v1/secrets/{secret_id}/acl policy
+
+ This test must check:
+ * whether the persona can delete the ACL for a secret
+ that belongs to a different project
+ """
+ raise NotImplementedError
+
+
+class BarbicanV1_1SecretConsumers:
+
+ @abc.abstractmethod
+ def test_list_secret_consumers(self):
+ """Test list_secret_consumers policy
+
+ Testing: GET /v1/secrets/{secret-id}/consumers
+ This test must check:
+ * whether the persona can list a secrets consumers
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_secret_consumer(self):
+ """Test create_secret_consumer policy
+
+ Testing: POST /v1/secrets/{secret-id}/consumers
+ This test must check:
+ * whether the persona can create a consumer of the secret
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_secret_consumer(self):
+ """Test delete_secret_consumer policy
+
+ Testing: DELETE /v1/secrets/{secret-id}/consumers
+ This test must check:
+ * whether the persona can delete a consumer of the secret
+ """
+ raise NotImplementedError
+
+
+class ProjectReaderBase(rbac_base.BarbicanV1RbacBase):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.SecretClient()
+
+ def setUp(self):
+ super().setUp()
+ self.secret_id = self.create_test_secret(
+ self.secret_client,
+ data_utils.rand_name('test-secrets'),
+ 'THIS_IS_A_SECRET_PASSPHRASE')
+ self.other_secret_id = self.create_test_secret(
+ self.other_secret_client,
+ data_utils.rand_name('test-secrets'),
+ 'THIS_IS_SOMEONE_ELSES_SECRET_PASSPHRASE')
+ self.valid_acl = {
+ "read": {
+ "users": [self.other_secret_client.user_id],
+ "project-access": True
+ }
+ }
+
+
+class ProjectReaderTests(ProjectReaderBase, BarbicanV1RbacSecrets):
+
+ def test_create_secret(self):
+ """Test add_secret policy."""
+ self.assertRaises(exceptions.Forbidden, self.client.create_secret)
+
+ key = rbac_base.create_aes_key()
+ expire_time = (datetime.utcnow() + timedelta(days=5))
+
+ self.assertRaises(
+ exceptions.Forbidden, self.client.create_secret,
+ expiration=expire_time.isoformat(), algorithm="aes",
+ bit_length=256, mode="cbc", payload=key,
+ payload_content_type="application/octet-stream",
+ payload_content_encoding="base64"
+ )
+
+ def test_list_secrets(self):
+ """Test get_secrets policy."""
+ # create two secrets
+ self.create_empty_secret_admin('secret_1')
+ self.create_empty_secret_admin('secret_2')
+
+ # list secrets with name secret_1
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.list_secrets,
+ name='secret_1'
+ )
+
+ # list secrets with name secret_2
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.list_secrets,
+ name='secret_2'
+ )
+
+ # list all secrets
+ self.assertRaises(exceptions.Forbidden, self.client.list_secrets)
+
+ def test_delete_secret(self):
+ """Test delete_secrets policy."""
+ sec = self.create_empty_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret,
+ secret_id=uuid
+ )
+
+ def test_get_secret(self):
+ """Test get_secret policy."""
+ sec = self.create_empty_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_metadata,
+ secret_id=uuid
+ )
+
+ def test_get_secret_payload(self):
+ """Test get_secret payload policy."""
+ key, sec = self.create_aes_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ # Retrieve the payload
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_payload,
+ secret_id=uuid
+ )
+
+ def test_put_secret_payload(self):
+ """Test put_secret policy."""
+ sec = self.create_empty_secret_admin('secret_1')
+ uuid = self.ref_to_uuid(sec['secret_ref'])
+
+ key = rbac_base.create_aes_key()
+
+ # Associate the payload with the created secret
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_secret_payload,
+ secret_id=uuid, payload=key
+ )
+
+ def test_get_other_project_secret(self):
+ other_secret_id = self.create_other_project_secret(
+ 'get_other_secret',
+ payload='¡Muy secreto!')
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_metadata,
+ other_secret_id)
+
+ def test_get_other_project_secret_payload(self):
+ other_secret_id = self.create_other_project_secret(
+ 'get_other_payload',
+ payload='¡Más secreto!')
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_payload,
+ other_secret_id)
+
+ def test_put_other_project_secret_payload(self):
+ other_secret_id = self.create_other_project_secret('put_other_payload')
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_secret_payload,
+ other_secret_id,
+ 'Shhhh... secret!')
+
+ def test_delete_other_project_secret(self):
+ other_secret_id = self.create_other_project_secret(
+ 'get_other_payload',
+ payload='loremipsumloremipsum')
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret,
+ other_secret_id)
+
+ def test_get_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_acl,
+ self.secret_id)
+
+ def test_put_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_secret_acl,
+ self.secret_id,
+ self.valid_acl)
+
+ def test_patch_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.patch_secret_acl,
+ self.secret_id,
+ self.valid_acl)
+
+ def test_delete_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret_acl,
+ self.secret_id)
+
+ def test_get_other_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.get_secret_acl,
+ self.other_secret_id)
+
+ def test_put_other_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.put_secret_acl,
+ self.other_secret_id,
+ self.valid_acl)
+
+ def test_patch_other_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.patch_secret_acl,
+ self.other_secret_id,
+ self.valid_acl)
+
+ def test_delete_other_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.delete_secret_acl,
+ self.other_secret_id)
+
+
+class ProjectReaderV1_1Tests(ProjectReaderBase, BarbicanV1_1SecretConsumers):
+
+ min_microversion = '1.1'
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.secret_consumer_client = \
+ cls.os_project_reader.secret_v1_1.SecretConsumerClient()
+
+ def setUp(self):
+ super().setUp()
+ self.test_consumer = {
+ "service": "service1",
+ "resource_id": "resource_id1",
+ "resource_type": "resource_type1"
+ }
+ self.member_secret_consumer_client.add_consumer_to_secret(
+ self.secret_id,
+ **self.test_consumer
+ )
+
+ def test_list_secret_consumers(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.secret_consumer_client.list_consumers_in_secret,
+ self.secret_id)
+
+ def test_create_secret_consumer(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.secret_consumer_client.add_consumer_to_secret,
+ self.secret_id,
+ **self.test_consumer)
+
+ def test_delete_secret_consumer(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.secret_consumer_client.delete_consumer_from_secret,
+ self.secret_id,
+ **self.test_consumer)
+
+
+class ProjectMemberTests(ProjectReaderTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
+
+ def test_create_secret(self):
+ """Test add_secret policy."""
+ self.client.create_secret(name='test_create_secret')
+
+ key = rbac_base.create_aes_key()
+ expire_time = (datetime.utcnow() + timedelta(days=5))
+ self.client.create_secret(
+ name='test_create_secret2',
+ expiration=expire_time.isoformat(), algorithm="aes",
+ bit_length=256, mode="cbc", payload=key,
+ payload_content_type="application/octet-stream",
+ payload_content_encoding="base64"
+ )
+
+ def test_list_secrets(self):
+ """Test get_secrets policy."""
+ # create two secrets
+ self.create_empty_secret_admin('test_list_secrets')
+ self.create_empty_secret_admin('test_list_secrets_2')
+
+ # list secrets with name secret_1
+ resp = self.client.list_secrets(name='test_list_secrets')
+ secrets = resp['secrets']
+ self.assertEqual('test_list_secrets', secrets[0]['name'])
+
+ # list secrets with name secret_2
+ resp = self.client.list_secrets(name='test_list_secrets_2')
+ secrets = resp['secrets']
+ self.assertEqual('test_list_secrets_2', secrets[0]['name'])
+
+ # list all secrets
+ resp = self.client.list_secrets()
+ secrets = resp['secrets']
+ self.assertGreaterEqual(len(secrets), 2)
+
+ def test_delete_secret(self):
+ """Test delete_secrets policy."""
+ sec = self.create_empty_secret_admin('test_delete_secret_1')
+ uuid = self.client.ref_to_uuid(sec['secret_ref'])
+ self.client.delete_secret(uuid)
+
+ def test_get_secret(self):
+ """Test get_secret policy."""
+ sec = self.create_empty_secret_admin('test_get_secret')
+ uuid = self.client.ref_to_uuid(sec['secret_ref'])
+ resp = self.client.get_secret_metadata(uuid)
+ self.assertEqual(uuid, self.client.ref_to_uuid(resp['secret_ref']))
+
+ def test_get_secret_payload(self):
+ """Test get_secret payload policy."""
+ key, sec = self.create_aes_secret_admin('test_get_secret_payload')
+ uuid = self.client.ref_to_uuid(sec['secret_ref'])
+
+ # Retrieve the payload
+ payload = self.client.get_secret_payload(uuid)
+ self.assertEqual(key, base64.b64encode(payload))
+
+ def test_put_secret_payload(self):
+ """Test put_secret policy."""
+ sec = self.create_empty_secret_admin('test_put_secret_payload')
+ uuid = self.client.ref_to_uuid(sec['secret_ref'])
+
+ key = rbac_base.create_aes_key()
+
+ # Associate the payload with the created secret
+ self.client.put_secret_payload(uuid, key)
+
+ # Retrieve the payload
+ payload = self.client.get_secret_payload(uuid)
+ self.assertEqual(key, base64.b64encode(payload))
+
+ def test_get_secret_acl(self):
+ acl = self.client.get_secret_acl(self.secret_id)
+ self.assertIn("read", acl.keys())
+
+ def test_put_secret_acl(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.other_secret_client.get_secret_metadata,
+ self.secret_id
+ )
+ _ = self.client.put_secret_acl(self.secret_id, self.valid_acl)
+ acl = self.client.get_secret_acl(self.secret_id)
+ self.assertIn(self.other_secret_client.user_id, acl['read']['users'])
+ resp = self.other_secret_client.get_secret_metadata(self.secret_id)
+ self.assertIn(self.secret_id, resp['secret_ref'])
+
+ def test_patch_secret_acl(self):
+ _ = self.client.put_secret_acl(self.secret_id, self.valid_acl)
+ acl = self.client.get_secret_acl(self.secret_id)
+ self.assertIn(self.other_secret_client.user_id, acl['read']['users'])
+ clear_users_acl = {
+ 'read': {
+ 'users': []
+ }
+ }
+ _ = self.client.patch_secret_acl(self.secret_id, clear_users_acl)
+ acl = self.client.get_secret_acl(self.secret_id)
+ self.assertNotIn(self.other_secret_client.user_id,
+ acl['read']['users'])
+
+ def test_delete_secret_acl(self):
+ _ = self.client.put_secret_acl(self.secret_id, self.valid_acl)
+ acl = self.client.get_secret_acl(self.secret_id)
+ self.assertIn(self.other_secret_client.user_id, acl['read']['users'])
+
+ _ = self.client.delete_secret_acl(self.secret_id)
+
+ acl = self.client.get_secret_acl(self.secret_id)
+ self.assertNotIn('users', acl['read'].keys())
+
+
+class ProjectMemberV1_1Tests(ProjectReaderV1_1Tests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.secret_consumer_client = cls.member_secret_consumer_client
+
+ def test_list_secret_consumers(self):
+ resp = self.secret_consumer_client.list_consumers_in_secret(
+ self.secret_id
+ )
+ self.assertEqual(1, resp['total'])
+
+ def test_create_secret_consumer(self):
+ second_consumer = {
+ 'service': 'service2',
+ 'resource_id': 'resource_id2',
+ 'resource_type': 'resource_type2'
+ }
+
+ resp = self.secret_consumer_client.add_consumer_to_secret(
+ self.secret_id,
+ **second_consumer)
+
+ self.assertEqual(2, len(resp['consumers']))
+
+ def test_delete_secret_consumer(self):
+ resp = self.secret_consumer_client.delete_consumer_from_secret(
+ self.secret_id,
+ **self.test_consumer)
+
+ self.assertEqual(0, len(resp['consumers']))
+
+
+class ProjectAdminTests(ProjectMemberTests):
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.admin_secret_client
+
+
+class ProjectAdminV1_1Tests(ProjectMemberV1_1Tests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.secret_consumer_client = cls.admin_secret_consumer_client
+
+ def test_create_secret_consumer(self):
+ pass
+
+ def test_delete_secret_consumer(self):
+ pass
+
+ def test_list_secret_consumers(self):
+ pass
+
+
+class SystemReaderTests(rbac_base.BarbicanV1RbacBase, BarbicanV1RbacSecrets):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
+
+ def test_create_secret(self):
+ pass
+
+ def test_list_secrets(self):
+ pass
+
+ def test_delete_secret(self):
+ pass
+
+ def test_get_secret(self):
+ pass
+
+ def test_get_secret_payload(self):
+ pass
+
+ def test_put_secret_payload(self):
+ pass
+
+ def test_get_other_project_secret(self):
+ pass
+
+ def test_get_other_project_secret_payload(self):
+ pass
+
+ def test_put_other_project_secret_payload(self):
+ pass
+
+ def test_delete_other_project_secret(self):
+ pass
+
+ def test_get_secret_acl(self):
+ pass
+
+ def test_put_secret_acl(self):
+ pass
+
+ def test_patch_secret_acl(self):
+ pass
+
+ def test_delete_secret_acl(self):
+ pass
+
+ def test_get_other_secret_acl(self):
+ pass
+
+ def test_put_other_secret_acl(self):
+ pass
+
+ def test_patch_other_secret_acl(self):
+ pass
+
+ def test_delete_other_secret_acl(self):
+ pass
+
+
+class SystemMemberTests(SystemReaderTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
+
+
+class SystemAdminTests(SystemMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.secret_client
diff --git a/barbican_tempest_plugin/tests/rbac/v1/test_transport_keys.py b/barbican_tempest_plugin/tests/rbac/v1/test_transport_keys.py
new file mode 100644
index 0000000..1984943
--- /dev/null
+++ b/barbican_tempest_plugin/tests/rbac/v1/test_transport_keys.py
@@ -0,0 +1,121 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import abc
+
+from tempest.lib import exceptions
+
+from barbican_tempest_plugin.tests.rbac.v1 import base
+
+
+class BarbicanV1RbacTransportKeys:
+
+ @abc.abstractmethod
+ def test_list_transport_keys(self):
+ """Test listing the transport keys
+
+ Testing: GET /v1/transport_keys
+ This test case must check:
+ * whether the persona can list the available transport keys
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_create_transport_key(self):
+ """Test creating a transport key
+
+ Testing: POST /v1/transport_keys
+ This test case must check:
+ * whether the persona can create a new transport key entry
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_get_transport_key(self):
+ """Test getting a specific transport key
+
+ Testing: GET /v1/transport_keys/{transport-key-id}
+ This test case must check:
+ * whether the persona can retrieve a specific transport key
+ """
+ raise NotImplementedError
+
+ @abc.abstractmethod
+ def test_delete_transport_key(self):
+ """Test deleting a specific transport key
+
+ Testing: DELETE /v1/transport_keys/{transport-key-id}
+ This test case must check:
+ * whether the persona can delete a specific transport key
+ """
+ raise NotImplementedError
+
+
+class ProjectMemberTests(base.BarbicanV1RbacBase, BarbicanV1RbacTransportKeys):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_member.secret_v1.TransportKeyClient()
+
+ def test_list_transport_keys(self):
+ resp = self.do_request('list_transport_keys')
+ self.assertIn('transport_keys', resp)
+
+ def test_create_transport_key(self):
+ self.do_request('create_transport_key',
+ expected_status=exceptions.Forbidden,
+ plugin_name='simple-crypto',
+ transport_key='???')
+
+ def test_get_transport_key(self):
+ # TODO(redorobot):
+ # We need to sort out how system admins create keys before we
+ # can test this.
+ #
+ # resp = self.do_request('list_transport_keys')
+ # transport_key_id = self.ref_to_uuid(
+ # resp['transport_keys'][0]['transport_key_ref']
+ # )
+ # resp = self.do_request('get_transport_key',
+ # transport_key_id=transport_key_id)
+ # self.assertEqual(transport_key_id, resp['transport_key_id'])
+ pass
+
+ def test_delete_transport_key(self):
+ # TODO(redorobot):
+ # We need to sort out how system admins create keys before we
+ # can test this.
+ #
+ # resp = self.do_request('list_transport_keys')
+ # transport_key_id = self.ref_to_uuid(
+ # resp['transport_keys'][0]['transport_key_ref']
+ # )
+ # resp = self.do_request('delete_transport_key',
+ # expected_status=exceptions.Forbidden,
+ # transport_key_id=transport_key_id)
+ pass
+
+
+class ProjectAdminTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_admin.secret_v1.TransportKeyClient()
+
+
+class ProjectReaderTests(ProjectMemberTests):
+
+ @classmethod
+ def setup_clients(cls):
+ super().setup_clients()
+ cls.client = cls.os_project_reader.secret_v1.TransportKeyClient()
diff --git a/barbican_tempest_plugin/tests/scenario/barbican_manager.py b/barbican_tempest_plugin/tests/scenario/barbican_manager.py
index a39bb0f..b6f24a9 100644
--- a/barbican_tempest_plugin/tests/scenario/barbican_manager.py
+++ b/barbican_tempest_plugin/tests/scenario/barbican_manager.py
@@ -68,8 +68,8 @@
'path in CONF.scenario.img_file config option.')
self.img_file = os.path.join(CONF.scenario.img_dir, self.img_file)
- self.private_key = rsa.generate_private_key(public_exponent=3,
- key_size=1024,
+ self.private_key = rsa.generate_private_key(public_exponent=65537,
+ key_size=3072,
backend=default_backend())
self.signing_certificate = self._create_self_signed_certificate(
self.private_key,
@@ -106,13 +106,13 @@
)
cls.order_client = os.secret_v1.OrderClient(service='key-manager')
cls.secret_client = os.secret_v1.SecretClient(service='key-manager')
+ cls.secret_consumer_client = os.secret_v1_1.SecretConsumerClient()
cls.secret_metadata_client = os.secret_v1.SecretMetadataClient(
service='key-manager'
)
+ cls.secret_consumer_client = os.secret_v1_1.VersionClient()
if CONF.compute_feature_enabled.attach_encrypted_volume:
- cls.admin_volume_types_client =\
- os_adm.volume_types_v2_client
cls.admin_encryption_types_client =\
os_adm.encryption_types_v2_client
diff --git a/barbican_tempest_plugin/tests/scenario/manager.py b/barbican_tempest_plugin/tests/scenario/manager.py
index 8daf090..a4254c6 100644
--- a/barbican_tempest_plugin/tests/scenario/manager.py
+++ b/barbican_tempest_plugin/tests/scenario/manager.py
@@ -16,12 +16,9 @@
from oslo_log import log
-from tempest.common import compute
from tempest.common import image as common_image
-from tempest.common.utils.linux import remote_client
from tempest.common import waiters
from tempest import config
-from tempest import exceptions
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
@@ -37,15 +34,11 @@
class ScenarioTest(manager.NetworkScenarioTest):
"""Base class for scenario tests. Uses tempest own clients. """
- credentials = ['primary']
-
@classmethod
def setup_clients(cls):
super(ScenarioTest, cls).setup_clients()
# Clients (in alphabetical order)
cls.flavors_client = cls.os_primary.flavors_client
- cls.compute_floating_ips_client = (
- cls.os_primary.compute_floating_ips_client)
if CONF.service_available.glance:
# Check if glance v1 is available to determine which client to use.
if CONF.image_feature_enabled.api_v1:
@@ -83,186 +76,6 @@
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
- def _create_port(self, network_id, client=None, namestart='port-quotatest',
- **kwargs):
- if not client:
- client = self.ports_client
- name = data_utils.rand_name(namestart)
- result = client.create_port(
- name=name,
- network_id=network_id,
- **kwargs)
- self.assertIsNotNone(result, 'Unable to allocate port')
- port = result['port']
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_port, port['id'])
- return port
-
- def create_keypair(self, client=None):
- if not client:
- client = self.keypairs_client
- name = data_utils.rand_name(self.__class__.__name__)
- # We don't need to create a keypair by pubkey in scenario
- body = client.create_keypair(name=name)
- self.addCleanup(client.delete_keypair, name)
- return body['keypair']
-
- def create_server(self, name=None, image_id=None, flavor=None,
- validatable=False, wait_until='ACTIVE',
- clients=None, **kwargs):
- """Wrapper utility that returns a test server.
-
- This wrapper utility calls the common create test server and
- returns a test server. The purpose of this wrapper is to minimize
- the impact on the code of the tests already using this
- function.
- """
-
- # NOTE(jlanoux): As a first step, ssh checks in the scenario
- # tests need to be run regardless of the run_validation and
- # validatable parameters and thus until the ssh validation job
- # becomes voting in CI. The test resources management and IP
- # association are taken care of in the scenario tests.
- # Therefore, the validatable parameter is set to false in all
- # those tests. In this way create_server just return a standard
- # server and the scenario tests always perform ssh checks.
-
- # Needed for the cross_tenant_traffic test:
- if clients is None:
- clients = self.os_primary
-
- if name is None:
- name = data_utils.rand_name(self.__class__.__name__ + "-server")
-
- vnic_type = CONF.network.port_vnic_type
-
- # If vnic_type is configured create port for
- # every network
- if vnic_type:
- ports = []
-
- create_port_body = {'binding:vnic_type': vnic_type,
- 'namestart': 'port-smoke'}
- if kwargs:
- # Convert security group names to security group ids
- # to pass to create_port
- if 'security_groups' in kwargs:
- security_groups = \
- clients.security_groups_client.list_security_groups(
- ).get('security_groups')
- sec_dict = dict([(s['name'], s['id'])
- for s in security_groups])
-
- sec_groups_names = [s['name'] for s in kwargs.pop(
- 'security_groups')]
- security_groups_ids = [sec_dict[s]
- for s in sec_groups_names]
-
- if security_groups_ids:
- create_port_body[
- 'security_groups'] = security_groups_ids
- networks = kwargs.pop('networks', [])
- else:
- networks = []
-
- # If there are no networks passed to us we look up
- # for the project's private networks and create a port.
- # The same behaviour as we would expect when passing
- # the call to the clients with no networks
- if not networks:
- networks = clients.networks_client.list_networks(
- **{'router:external': False, 'fields': 'id'})['networks']
-
- # It's net['uuid'] if networks come from kwargs
- # and net['id'] if they come from
- # clients.networks_client.list_networks
- for net in networks:
- net_id = net.get('uuid', net.get('id'))
- if 'port' not in net:
- port = self._create_port(network_id=net_id,
- client=clients.ports_client,
- **create_port_body)
- ports.append({'port': port['id']})
- else:
- ports.append({'port': net['port']})
- if ports:
- kwargs['networks'] = ports
- self.ports = ports
-
- tenant_network = self.get_tenant_network()
-
- body, servers = compute.create_test_server(
- clients,
- tenant_network=tenant_network,
- wait_until=wait_until,
- name=name, flavor=flavor,
- image_id=image_id, **kwargs)
-
- self.addCleanup(waiters.wait_for_server_termination,
- clients.servers_client, body['id'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- clients.servers_client.delete_server, body['id'])
- server = clients.servers_client.show_server(body['id'])['server']
- return server
-
- def create_volume(self, size=None, name=None, snapshot_id=None,
- imageRef=None, volume_type=None):
- if size is None:
- size = CONF.volume.volume_size
- if imageRef:
- image = self.compute_images_client.show_image(imageRef)['image']
- min_disk = image.get('minDisk')
- size = max(size, min_disk)
- if name is None:
- name = data_utils.rand_name(self.__class__.__name__ + "-volume")
- kwargs = {'display_name': name,
- 'snapshot_id': snapshot_id,
- 'imageRef': imageRef,
- 'volume_type': volume_type,
- 'size': size}
- if CONF.compute.compute_volume_common_az:
- kwargs.setdefault('availability_zone',
- CONF.compute.compute_volume_common_az)
- volume = self.volumes_client.create_volume(**kwargs)['volume']
-
- self.addCleanup(self.volumes_client.wait_for_resource_deletion,
- volume['id'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self.volumes_client.delete_volume, volume['id'])
-
- # NOTE(e0ne): Cinder API v2 uses name instead of display_name
- if 'display_name' in volume:
- self.assertEqual(name, volume['display_name'])
- else:
- self.assertEqual(name, volume['name'])
- waiters.wait_for_volume_resource_status(self.volumes_client,
- volume['id'], 'available')
- # The volume retrieved on creation has a non-up-to-date status.
- # Retrieval after it becomes active ensures correct details.
- volume = self.volumes_client.show_volume(volume['id'])['volume']
- return volume
-
- def create_volume_type(self, client=None, name=None, backend_name=None):
- if not client:
- client = self.admin_volume_types_client
- if not name:
- class_name = self.__class__.__name__
- name = data_utils.rand_name(class_name + '-volume-type')
- randomized_name = data_utils.rand_name('scenario-type-' + name)
-
- LOG.debug("Creating a volume type: %s on backend %s",
- randomized_name, backend_name)
- extra_specs = {}
- if backend_name:
- extra_specs = {"volume_backend_name": backend_name}
-
- body = client.create_volume_type(name=randomized_name,
- extra_specs=extra_specs)
- volume_type = body['volume_type']
- self.assertIn('id', volume_type)
- self.addCleanup(client.delete_volume_type, volume_type['id'])
- return volume_type
-
def _image_create(self, name, fmt, path,
disk_format=None, properties=None):
if properties is None:
@@ -295,7 +108,7 @@
available_stores = []
try:
available_stores = self.image_client.info_stores()['stores']
- except exceptions.NotFound:
+ except lib_exc.NotFound:
pass
available_import_methods = self.image_client.info_import()[
'import-methods']['value']
@@ -313,38 +126,6 @@
return image['id']
- def rebuild_server(self, server_id, image=None,
- preserve_ephemeral=False, wait=True,
- rebuild_kwargs=None):
- if image is None:
- image = CONF.compute.image_ref
-
- rebuild_kwargs = rebuild_kwargs or {}
-
- LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
- server_id, image, preserve_ephemeral)
- self.servers_client.rebuild_server(
- server_id=server_id, image_ref=image,
- preserve_ephemeral=preserve_ephemeral,
- **rebuild_kwargs)
- if wait:
- waiters.wait_for_server_status(self.servers_client,
- server_id, 'ACTIVE')
-
- def create_floating_ip(self, thing, pool_name=None):
- """Create a floating IP and associates to a server on Nova"""
-
- if not pool_name:
- pool_name = CONF.network.floating_network_name
- floating_ip = (self.compute_floating_ips_client.
- create_floating_ip(pool=pool_name)['floating_ip'])
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self.compute_floating_ips_client.delete_floating_ip,
- floating_ip['id'])
- self.compute_floating_ips_client.associate_floating_ip_to_server(
- floating_ip['ip'], thing['id'])
- return floating_ip
-
def nova_volume_attach(self, server, volume_to_attach):
volume = self.servers_client.attach_volume(
server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
@@ -365,105 +146,6 @@
volume = self.volumes_client.show_volume(volume['id'])['volume']
self.assertEqual('available', volume['status'])
- def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
- private_key=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key)
- if dev_name is not None:
- ssh_client.make_fs(dev_name)
- ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
- mount_path))
- cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
- ssh_client.exec_command(cmd_timestamp)
- timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
- % mount_path)
- if dev_name is not None:
- ssh_client.exec_command('sudo umount %s' % mount_path)
- return timestamp
-
- def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
- private_key=None):
- ssh_client = self.get_remote_client(ip_address,
- private_key=private_key)
- if dev_name is not None:
- ssh_client.mount(dev_name, mount_path)
- timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
- % mount_path)
- if dev_name is not None:
- ssh_client.exec_command('sudo umount %s' % mount_path)
- return timestamp
-
- def get_server_ip(self, server):
- """Get the server fixed or floating IP.
-
- Based on the configuration we're in, return a correct ip
- address for validating that a guest is up.
- """
- if CONF.validation.connect_method == 'floating':
- # The tests calling this method don't have a floating IP
- # and can't make use of the validation resources. So the
- # method is creating the floating IP there.
- return self.create_floating_ip(server)['ip']
- elif CONF.validation.connect_method == 'fixed':
- # Determine the network name to look for based on config or creds
- # provider network resources.
- if CONF.validation.network_for_ssh:
- addresses = server['addresses'][
- CONF.validation.network_for_ssh]
- else:
- creds_provider = self._get_credentials_provider()
- net_creds = creds_provider.get_primary_creds()
- network = getattr(net_creds, 'network', None)
- addresses = (server['addresses'][network['name']]
- if network else [])
- for address in addresses:
- ip_version_for_ssh = CONF.validation.ip_version_for_ssh
- if (address['version'] == ip_version_for_ssh and
- address['OS-EXT-IPS:type'] == 'fixed'):
- return address['addr']
- raise exceptions.ServerUnreachable(server_id=server['id'])
- else:
- raise lib_exc.InvalidConfiguration()
-
- def get_remote_client(self, ip_address, username=None, private_key=None):
- """Get a SSH client to a remote server
-
- @param ip_address the server floating or fixed IP address to use
- for ssh validation
- @param username name of the Linux account on the remote server
- @param private_key the SSH private key to use
- @return a RemoteClient object
- """
-
- if username is None:
- username = CONF.validation.image_ssh_user
- # Set this with 'keypair' or others to log in with keypair or
- # username/password.
- if CONF.validation.auth_method == 'keypair':
- password = None
- if private_key is None:
- private_key = self.keypair['private_key']
- else:
- password = CONF.validation.image_ssh_password
- private_key = None
- linux_client = remote_client.RemoteClient(ip_address, username,
- pkey=private_key,
- password=password)
- try:
- linux_client.validate_authentication()
- except Exception as e:
- message = ('Initializing SSH connection to %(ip)s failed. '
- 'Error: %(error)s' % {'ip': ip_address,
- 'error': e})
- caller = test_utils.find_test_caller()
- if caller:
- message = '(%s) %s' % (caller, message)
- LOG.exception(message)
- self._log_console_output()
- raise
-
- return linux_client
-
def _default_security_group(self, client=None, tenant_id=None):
"""Get default secgroup for given tenant_id.
diff --git a/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py b/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py
index 5a6b5b7..e64952c 100644
--- a/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py
+++ b/barbican_tempest_plugin/tests/scenario/test_certificate_validation.py
@@ -46,6 +46,11 @@
cls.max_microversion,
CONF.compute.min_microversion,
CONF.compute.max_microversion)
+ if not CONF.auth.create_isolated_networks:
+ # FIXME(redorobt): remove this skip when system-scope admin
+ # issue is fixed.
+ raise cls.skipException(
+ 'Certificate Validation tests require isolated networks')
@decorators.idempotent_id('b41bc663-5662-4b1e-b8f1-27b2876f16a6')
@utils.services('compute', 'image')
@@ -168,5 +173,5 @@
}
self.rebuild_server(instance['id'],
img_uuid_rebuild,
- rebuild_kwargs=rebuild_kwargs)
+ **rebuild_kwargs)
self.servers_client.delete_server(instance['id'])
diff --git a/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py b/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py
index ee1bda5..0b81649 100644
--- a/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py
+++ b/barbican_tempest_plugin/tests/scenario/test_ephemeral_disk_encryption.py
@@ -42,6 +42,11 @@
if not CONF.ephemeral_storage_encryption.enabled:
raise cls.skipException(
'Ephemeral storage encryption is not supported')
+ if not CONF.auth.create_isolated_networks:
+ # FIXME(redorobt): remove this skip when system-scope admin
+ # issue is fixed.
+ raise cls.skipException(
+ 'Ephemeral storage encryption requires isolated networks')
@classmethod
def resource_setup(cls):
@@ -68,7 +73,8 @@
instance_ip = self.get_server_ip(instance)
ssh_client = self.get_remote_client(
instance_ip,
- private_key=keypair['private_key'])
+ private_key=keypair['private_key'],
+ server=instance)
ssh_client.exec_command('echo "%s" > %s' % (test_string,
client_test_path))
test_output = ssh_client.exec_command('cat %s' % client_test_path)
diff --git a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py
index 445ca14..56ed64b 100644
--- a/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py
+++ b/barbican_tempest_plugin/tests/scenario/test_volume_encryption.py
@@ -49,6 +49,11 @@
super(VolumeEncryptionTest, cls).skip_checks()
if not CONF.compute_feature_enabled.attach_encrypted_volume:
raise cls.skipException('Encrypted volume attach is not supported')
+ if not CONF.auth.create_isolated_networks:
+ # FIXME(redorobt): remove this skip when system-scope admin
+ # issue is fixed.
+ raise cls.skipException(
+ 'Volume encryption requires isolated networks')
@classmethod
def resource_setup(cls):
@@ -76,12 +81,14 @@
timestamp = self.create_timestamp(
server_ip,
dev_name=CONF.compute.volume_device_name,
- private_key=keypair['private_key']
+ private_key=keypair['private_key'],
+ server=server
)
timestamp2 = self.get_timestamp(
server_ip,
dev_name=CONF.compute.volume_device_name,
- private_key=keypair['private_key']
+ private_key=keypair['private_key'],
+ server=server
)
self.assertEqual(timestamp, timestamp2)
@@ -131,7 +138,6 @@
self.check_tenant_network_connectivity(
server, CONF.validation.image_ssh_user, keypair['private_key'])
- volume = self.create_encrypted_volume('nova.volume.encryptors.'
- 'cryptsetup.CryptsetupEncryptor',
+ volume = self.create_encrypted_volume('plain',
volume_type='cryptsetup')
self.attach_detach_volume(server, volume, keypair)
diff --git a/requirements.txt b/requirements.txt
index 67ae150..f2e2e43 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,7 +4,7 @@
pbr!=2.1.0,>=2.0.0 # Apache-2.0
-tempest>=17.1.0 # Apache-2.0
+tempest>=27.0.0 # Apache-2.0
cryptography>=2.1 # BSD/Apache-2.0
oslo.config>=5.2.0 # Apache-2.0
diff --git a/setup.cfg b/setup.cfg
index 97da488..a8d2496 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,6 +17,7 @@
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
+ Programming Language :: Python :: 3.9
[files]
packages =
diff --git a/tox.ini b/tox.ini
index 921fee1..c1baf1e 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,13 +1,13 @@
[tox]
minversion = 3.1.1
-envlist = py38,pypy,pep8
+envlist = py39,pypy,pep8
skipsdist = True
ignore_basepython_conflict = True
[testenv]
basepython = python3
usedevelop = True
-install_command = pip install -c{env:UPPER_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
+install_command = pip install -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning
@@ -25,10 +25,8 @@
commands = oslo_debug_helper {posargs}
[flake8]
-# E123, E125 skipped as they are invalid PEP-8.
# W504 line break after binary operator
-
show-source = True
-ignore = E123,E125,W504
+ignore = W504
builtins = _
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build