[Tempest] Refactor api/tests/admin/test_share_servers module
Doing following things:
- Use ddt module removing code duplication.
- Use v2 APIs instead of deprecated v1.
- Use correct 'share_server_id' for
'test_show_share_server_details' test to avoid concurrency issues.
For more details, see closed b_u_g.
Change-Id: If4d6029f2250e80c1eec0debb1b09a805d997028
Closes-Bug: #1663865
diff --git a/manila_tempest_tests/tests/api/admin/test_share_servers.py b/manila_tempest_tests/tests/api/admin/test_share_servers.py
index 25cc406..ad5ae47 100644
--- a/manila_tempest_tests/tests/api/admin/test_share_servers.py
+++ b/manila_tempest_tests/tests/api/admin/test_share_servers.py
@@ -15,9 +15,11 @@
import re
+import ddt
import six
from tempest import config
from tempest.lib import exceptions as lib_exc
+import testtools
from testtools import testcase as tc
from manila_tempest_tests.tests.api import base
@@ -25,21 +27,21 @@
CONF = config.CONF
+@testtools.skipUnless(
+ CONF.share.multitenancy_enabled,
+ 'Share servers can be tested only with multitenant drivers.')
+@ddt.ddt
class ShareServersAdminTest(base.BaseSharesAdminTest):
@classmethod
def resource_setup(cls):
super(ShareServersAdminTest, cls).resource_setup()
- if not CONF.share.multitenancy_enabled:
- msg = ("Share servers can be tested only with multitenant drivers."
- " Skipping.")
- raise cls.skipException(msg)
cls.share = cls.create_share()
- cls.share_network = cls.shares_client.get_share_network(
- cls.shares_client.share_network_id)
+ cls.share_network = cls.shares_v2_client.get_share_network(
+ cls.shares_v2_client.share_network_id)
if not cls.share_network["name"]:
sn_id = cls.share_network["id"]
- cls.share_network = cls.shares_client.update_share_network(
+ cls.share_network = cls.shares_v2_client.update_share_network(
sn_id, name="sn_%s" % sn_id)
cls.sn_name_and_id = [
cls.share_network["name"],
@@ -52,7 +54,7 @@
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_without_filters(self):
- servers = self.shares_client.list_share_servers()
+ servers = self.shares_v2_client.list_share_servers()
self.assertGreater(len(servers), 0)
keys = [
"id",
@@ -82,61 +84,35 @@
any(s["share_network_name"] in self.sn_name_and_id for s in servers)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
- def test_list_share_servers_with_host_filter(self):
+ @ddt.data('host', 'status')
+ def test_list_share_servers_with_filters(self, filter_key):
# Get list of share servers and remember 'host' name
- servers = self.shares_client.list_share_servers()
+ servers = self.shares_v2_client.list_share_servers()
# Remember name of server that was used by this test suite
# to be sure it will be still existing.
- host = ""
for server in servers:
if server["share_network_name"] in self.sn_name_and_id:
- if not server["host"]:
+ if not server[filter_key]:
msg = ("Server '%s' has wrong value for host - "
- "'%s'.") % (server["id"], server["host"])
+ "'%s'.") % (server["id"], server[filter_key])
raise lib_exc.InvalidContentType(message=msg)
- host = server["host"]
+ filter_value = server[filter_key]
break
- if not host:
+ else:
msg = ("Appropriate server was not found. Its share_network_data"
": '%s'. List of servers: '%s'.") % (self.sn_name_and_id,
- str(servers))
+ six.text_type(servers))
raise lib_exc.NotFound(message=msg)
- search_opts = {"host": host}
- servers = self.shares_client.list_share_servers(search_opts)
+ search_opts = {filter_key: filter_value}
+ servers = self.shares_v2_client.list_share_servers(search_opts)
self.assertGreater(len(servers), 0)
for server in servers:
- self.assertEqual(server["host"], host)
-
- @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
- def test_list_share_servers_with_status_filter(self):
- # Get list of share servers
- servers = self.shares_client.list_share_servers()
- # Remember status of server that was used by this test suite
- # to be sure it will be still existing.
- status = ""
- for server in servers:
- if server["share_network_name"] in self.sn_name_and_id:
- if not server["status"]:
- msg = ("Server '%s' has wrong value for status - "
- "'%s'.") % (server["id"], server["host"])
- raise lib_exc.InvalidContentType(message=msg)
- status = server["status"]
- break
- if not status:
- msg = ("Appropriate server was not found. Its share_network_data"
- ": '%s'. List of servers: '%s'.") % (self.sn_name_and_id,
- str(servers))
- raise lib_exc.NotFound(message=msg)
- search_opts = {"status": status}
- servers = self.shares_client.list_share_servers(search_opts)
- self.assertGreater(len(servers), 0)
- for server in servers:
- self.assertEqual(server["status"], status)
+ self.assertEqual(server[filter_key], filter_value)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_project_id_filter(self):
search_opts = {"project_id": self.share_network["project_id"]}
- servers = self.shares_client.list_share_servers(search_opts)
+ servers = self.shares_v2_client.list_share_servers(search_opts)
# Should exist, at least, one share server, used by this test suite.
self.assertGreater(len(servers), 0)
for server in servers:
@@ -146,7 +122,7 @@
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_share_network_name_filter(self):
search_opts = {"share_network": self.share_network["name"]}
- servers = self.shares_client.list_share_servers(search_opts)
+ servers = self.shares_v2_client.list_share_servers(search_opts)
# Should exist, at least, one share server, used by this test suite.
self.assertGreater(len(servers), 0)
for server in servers:
@@ -156,7 +132,7 @@
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_share_network_id_filter(self):
search_opts = {"share_network": self.share_network["id"]}
- servers = self.shares_client.list_share_servers(search_opts)
+ servers = self.shares_v2_client.list_share_servers(search_opts)
# Should exist, at least, one share server, used by this test suite.
self.assertGreater(len(servers), 0)
for server in servers:
@@ -165,8 +141,9 @@
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_show_share_server(self):
- share = self.shares_client.get_share(self.share["id"])
- server = self.shares_client.show_share_server(share["share_server_id"])
+ share = self.shares_v2_client.get_share(self.share["id"])
+ server = self.shares_v2_client.show_share_server(
+ share["share_server_id"])
keys = [
"id",
"host",
@@ -180,35 +157,35 @@
# all expected keys are present
for key in keys:
self.assertIn(key, server.keys())
+
# 'created_at' is valid date
self.assertTrue(self.date_re.match(server["created_at"]))
+
# 'updated_at' is valid date if set
if server["updated_at"]:
self.assertTrue(self.date_re.match(server["updated_at"]))
- # Host is not empty
- self.assertGreater(len(server["host"]), 0)
- # Id is not empty
- self.assertGreater(len(server["id"]), 0)
- # Project id is not empty
- self.assertGreater(len(server["project_id"]), 0)
- # Status is not empty
- self.assertGreater(len(server["status"]), 0)
- # share_network_name is not empty
- self.assertGreater(len(server["share_network_name"]), 0)
- # backend_details should be a dict
+
+ # veriy that values for following keys are not empty
+ for k in ('host', 'id', 'project_id', 'status', 'share_network_name'):
+ self.assertGreater(len(server[k]), 0)
+
+ # 'backend_details' should be a dict
self.assertIsInstance(server["backend_details"], dict)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_show_share_server_details(self):
- servers = self.shares_client.list_share_servers()
- details = self.shares_client.show_share_server_details(
- servers[0]["id"])
+ share = self.shares_v2_client.get_share(self.share['id'])
+ details = self.shares_v2_client.show_share_server_details(
+ share['share_server_id'])
+
# If details are present they and their values should be only strings
- for k, v in details.iteritems():
+ for k, v in details.items():
self.assertIsInstance(k, six.string_types)
self.assertIsInstance(v, six.string_types)
- def _delete_share_server(self, delete_share_network):
+ @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+ @ddt.data(True, False)
+ def test_delete_share_server(self, delete_share_network):
# Get network and subnet from existing share_network and reuse it
# to be able to delete share_server after test ends.
# TODO(vponomaryov): attach security-services too. If any exist from
@@ -221,8 +198,8 @@
self.create_share(share_network_id=new_sn['id'])
# List share servers, filtered by share_network_id
- search_opts = {"share_network": new_sn["id"]}
- servers = self.shares_client.list_share_servers(search_opts)
+ servers = self.shares_v2_client.list_share_servers(
+ {"share_network": new_sn["id"]})
# There can be more than one share server for share network when retry
# was used and share was created successfully not from first time.
@@ -233,42 +210,36 @@
self.assertEqual(new_sn["id"], serv["share_network_id"])
# List shares by share server id
- params = {"share_server_id": serv["id"]}
- shares = self.shares_client.list_shares_with_detail(params)
+ shares = self.shares_v2_client.list_shares_with_detail(
+ {"share_server_id": serv["id"]})
for s in shares:
self.assertEqual(new_sn["id"], s["share_network_id"])
# Delete shares, so we will have share server without shares
for s in shares:
- self.shares_client.delete_share(s["id"])
+ self.shares_v2_client.delete_share(s["id"])
# Wait for shares deletion
for s in shares:
- self.shares_client.wait_for_resource_deletion(share_id=s["id"])
+ self.shares_v2_client.wait_for_resource_deletion(
+ share_id=s["id"])
# List shares by share server id, we expect empty list
- params = {"share_server_id": serv["id"]}
- empty = self.shares_client.list_shares_with_detail(params)
+ empty = self.shares_v2_client.list_shares_with_detail(
+ {"share_server_id": serv["id"]})
self.assertEqual(0, len(empty))
if delete_share_network:
# Delete share network, it should trigger share server deletion
- self.shares_client.delete_share_network(new_sn["id"])
+ self.shares_v2_client.delete_share_network(new_sn["id"])
else:
# Delete share server
- self.shares_client.delete_share_server(serv["id"])
+ self.shares_v2_client.delete_share_server(serv["id"])
# Wait for share server deletion
- self.shares_client.wait_for_resource_deletion(server_id=serv["id"])
+ self.shares_v2_client.wait_for_resource_deletion(
+ server_id=serv["id"])
if delete_share_network:
- self.shares_client.wait_for_resource_deletion(
+ self.shares_v2_client.wait_for_resource_deletion(
sn_id=new_sn["id"])
-
- @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
- def test_delete_share_server(self):
- self._delete_share_server(False)
-
- @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
- def test_delete_share_server_by_deletion_of_share_network(self):
- self._delete_share_server(True)