Add swift scenario tests
This tests swift as exercises do, trying common operations as:
- create/delete a container;
- add/delete object to a container;
- list objects in the container;
Partial-Bug: #1023131
Change-Id: Id4b27ee70f420f7536b4c698eb1f3bba15dbd1e8
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index 06841e1..c84c4fe 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -24,12 +24,14 @@
import cinderclient.client
import glanceclient
import heatclient.client
+import keystoneclient.apiclient.exceptions
import keystoneclient.v2_0.client
import netaddr
from neutronclient.common import exceptions as exc
import neutronclient.v2_0.client
import novaclient.client
from novaclient import exceptions as nova_exceptions
+import swiftclient
from tempest.api.network import common as net_common
from tempest.common import isolated_creds
@@ -75,6 +77,10 @@
self.volume_client = self._get_volume_client(username,
password,
tenant_name)
+ self.object_storage_client = self._get_object_storage_client(
+ username,
+ password,
+ tenant_name)
self.orchestration_client = self._get_orchestration_client(
username,
password,
@@ -123,6 +129,30 @@
region_name=region,
http_log_debug=True)
+ def _get_object_storage_client(self, username, password, tenant_name):
+ auth_url = self.config.identity.uri
+ # add current tenant to Member group.
+ keystone_admin = self._get_identity_client(
+ self.config.identity.admin_username,
+ self.config.identity.admin_password,
+ self.config.identity.admin_tenant_name)
+
+ # enable test user to operate swift by adding Member role to him.
+ roles = keystone_admin.roles.list()
+ member_role = [role for role in roles if role.name == 'Member'][0]
+ # NOTE(maurosr): This is surrounded in the try-except block cause
+ # neutron tests doesn't have tenant isolation.
+ try:
+ keystone_admin.roles.add_user_role(self.identity_client.user_id,
+ member_role.id,
+ self.identity_client.tenant_id)
+ except keystoneclient.apiclient.exceptions.Conflict:
+ pass
+
+ return swiftclient.Connection(auth_url, username, password,
+ tenant_name=tenant_name,
+ auth_version='2')
+
def _get_orchestration_client(self, username=None, password=None,
tenant_name=None):
if not username:
@@ -216,6 +246,7 @@
cls.identity_client = cls.manager.identity_client
cls.network_client = cls.manager.network_client
cls.volume_client = cls.manager.volume_client
+ cls.object_storage_client = cls.manager.object_storage_client
cls.orchestration_client = cls.manager.orchestration_client
cls.resource_keys = {}
cls.os_resources = []
diff --git a/tempest/scenario/test_swift_basic_ops.py b/tempest/scenario/test_swift_basic_ops.py
new file mode 100644
index 0000000..6763503
--- /dev/null
+++ b/tempest/scenario/test_swift_basic_ops.py
@@ -0,0 +1,101 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM Corp.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from tempest.common.utils.data_utils import rand_name
+from tempest.openstack.common import log as logging
+from tempest.scenario import manager
+from tempest.test import services
+
+LOG = logging.getLogger(__name__)
+
+
+class TestSwiftBasicOps(manager.OfficialClientTest):
+ """
+ Test swift with the follow operations:
+ * get swift stat.
+ * create container.
+ * upload a file to the created container.
+ * list container's objects and assure that the uploaded file is present.
+ * delete object from container.
+ * list container's objects and assure that the deleted file is gone.
+ * delete a container.
+ * list containers and assure that the deleted container is gone.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSwiftBasicOps, cls).setUpClass()
+ if not cls.config.service_available.swift:
+ skip_msg = ("%s skipped as swift is not available" %
+ cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ def _get_swift_stat(self):
+ """get swift status for our user account."""
+ self.object_storage_client.get_account()
+ LOG.debug('Swift status information obtained successfully')
+
+ def _create_container(self, container_name=None):
+ name = container_name or rand_name('swift-scenario-container')
+ self.object_storage_client.put_container(name)
+ # look for the container to assure it is created
+ self._list_and_check_container_objects(name)
+ LOG.debug('Container %s created' % (name))
+ return name
+
+ def _delete_container(self, container_name):
+ self.object_storage_client.delete_container(container_name)
+ LOG.debug('Container %s deleted' % (container_name))
+
+ def _upload_object_to_container(self, container_name, obj_name=None):
+ obj_name = obj_name or rand_name('swift-scenario-object')
+ self.object_storage_client.put_object(container_name, obj_name,
+ rand_name('obj_data'),
+ content_type='text/plain')
+ return obj_name
+
+ def _delete_object(self, container_name, filename):
+ self.object_storage_client.delete_object(container_name, filename)
+ self._list_and_check_container_objects(container_name,
+ not_present_obj=[filename])
+
+ def _list_and_check_container_objects(self, container_name, present_obj=[],
+ not_present_obj=[]):
+ """
+ List objects for a given container and assert which are present and
+ which are not.
+ """
+ meta, response = self.object_storage_client.get_container(
+ container_name)
+ # create a list with file name only
+ object_list = [obj['name'] for obj in response]
+ if present_obj:
+ for obj in present_obj:
+ self.assertIn(obj, object_list)
+ if not_present_obj:
+ for obj in not_present_obj:
+ self.assertNotIn(obj, object_list)
+
+ @services('object')
+ def test_swift_basic_ops(self):
+ self._get_swift_stat()
+ container_name = self._create_container()
+ obj_name = self._upload_object_to_container(container_name)
+ self._list_and_check_container_objects(container_name, [obj_name])
+ self._delete_object(container_name, obj_name)
+ self._delete_container(container_name)