blob: e8f3f8b8501328ffc36cc4b2d22a9976ad74942b [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest.common import custom_matchers
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
import tempest.test
CONF = config.CONF
def delete_containers(containers, container_client, object_client):
"""Remove containers and all objects in them.
The containers should be visible from the container_client given.
Will not throw any error if the containers don't exist.
Will not check that object and container deletions succeed.
After delete all the objects from a container, it will wait 2
seconds before delete the container itself, in order to deployments
using HA proxy sync the deletion properly, otherwise, the container
might fail to be deleted because it's not empty.
:param containers: List of containers(or string of a container)
to be deleted
:param container_client: Client to be used to delete containers
:param object_client: Client to be used to delete objects
"""
if isinstance(containers, str):
containers = [containers]
for cont in containers:
try:
params = {'limit': 9999, 'format': 'json'}
_, objlist = container_client.list_container_objects(cont, params)
# delete every object in the container
for obj in objlist:
test_utils.call_and_ignore_notfound_exc(
object_client.delete_object, cont, obj['name'])
# sleep 2 seconds to sync the deletion of the objects
# in HA deployment
time.sleep(2)
container_client.delete_container(cont)
except lib_exc.NotFound:
pass
class BaseObjectTest(tempest.test.BaseTestCase):
credentials = [['operator', CONF.object_storage.operator_role]]
@classmethod
def skip_checks(cls):
super(BaseObjectTest, cls).skip_checks()
if not CONF.service_available.swift:
skip_msg = ("%s skipped as swift is not available" % cls.__name__)
raise cls.skipException(skip_msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
super(BaseObjectTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseObjectTest, cls).setup_clients()
cls.object_client = cls.os_roles_operator.object_client
cls.bulk_client = cls.os_roles_operator.bulk_client
cls.container_client = cls.os_roles_operator.container_client
cls.account_client = cls.os_roles_operator.account_client
cls.capabilities_client = cls.os_roles_operator.capabilities_client
@classmethod
def resource_setup(cls):
super(BaseObjectTest, cls).resource_setup()
# Make sure we get fresh auth data after assigning swift role
cls.object_client.auth_provider.clear_auth()
cls.container_client.auth_provider.clear_auth()
cls.account_client.auth_provider.clear_auth()
# make sure that discoverability is enabled and that the sections
# have not been disallowed by Swift
cls.policies = None
if CONF.object_storage_feature_enabled.discoverability:
body = cls.capabilities_client.list_capabilities()
if 'swift' in body and 'policies' in body['swift']:
cls.policies = body['swift']['policies']
cls.containers = []
@classmethod
def create_container(cls):
# wrapper that returns a test container
container_name = data_utils.rand_name(name='TestContainer')
cls.container_client.update_container(container_name)
cls.containers.append(container_name)
return container_name
@classmethod
def create_object(cls, container_name, object_name=None,
data=None, metadata=None):
# wrapper that returns a test object
if object_name is None:
object_name = data_utils.rand_name(name='TestObject')
if data is None:
data = data_utils.random_bytes()
cls.object_client.create_object(container_name,
object_name,
data,
metadata=metadata)
return object_name, data
@classmethod
def delete_containers(cls, container_client=None, object_client=None):
if container_client is None:
container_client = cls.container_client
if object_client is None:
object_client = cls.object_client
delete_containers(cls.containers, container_client, object_client)
def assertHeaders(self, resp, target, method):
"""Check the existence and the format of response headers"""
self.assertThat(resp, custom_matchers.ExistsAllResponseHeaders(
target, method, self.policies))
self.assertThat(resp, custom_matchers.AreAllWellFormatted())