Merge "Improve tempest auth tests"
diff --git a/tempest/tests/fake_identity.py b/tempest/tests/fake_identity.py
index ea2bd44..058c9c2 100644
--- a/tempest/tests/fake_identity.py
+++ b/tempest/tests/fake_identity.py
@@ -25,16 +25,16 @@
COMPUTE_ENDPOINTS_V2 = {
"endpoints": [
{
- "adminURL": "http://fake_url/api/admin",
+ "adminURL": "http://fake_url/v2/first_endpoint/admin",
"region": "NoMatchRegion",
- "internalURL": "http://fake_url/api/internal",
- "publicURL": "http://fake_url/api/public"
+ "internalURL": "http://fake_url/v2/first_endpoint/internal",
+ "publicURL": "http://fake_url/v2/first_endpoint/public"
},
{
- "adminURL": "http://fake_url/api/admin",
+ "adminURL": "http://fake_url/v2/second_endpoint/admin",
"region": "FakeRegion",
- "internalURL": "http://fake_url/api/internal",
- "publicURL": "http://fake_url/api/public"
+ "internalURL": "http://fake_url/v2/second_endpoint/internal",
+ "publicURL": "http://fake_url/v2/second_endpoint/public"
},
],
"type": "compute",
@@ -79,17 +79,24 @@
COMPUTE_ENDPOINTS_V3 = {
"endpoints": [
{
- "id": "fake_service",
+ "id": "first_compute_fake_service",
"interface": "public",
"region": "NoMatchRegion",
- "url": "http://fake_url/v3"
+ "url": "http://fake_url/v3/first_endpoint/api"
},
{
- "id": "another_fake_service",
+ "id": "second_fake_service",
"interface": "public",
"region": "FakeRegion",
- "url": "http://fake_url/v3"
+ "url": "http://fake_url/v3/second_endpoint/api"
+ },
+ {
+ "id": "third_fake_service",
+ "interface": "admin",
+ "region": "MiddleEarthRegion",
+ "url": "http://fake_url/v3/third_endpoint/api"
}
+
],
"type": "compute",
"id": "fake_compute_endpoint"
diff --git a/tempest/tests/test_auth.py b/tempest/tests/test_auth.py
index 5346052..df04d65 100644
--- a/tempest/tests/test_auth.py
+++ b/tempest/tests/test_auth.py
@@ -100,6 +100,7 @@
class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
+ _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
_auth_provider_class = auth.KeystoneV2AuthProvider
def setUp(self):
@@ -111,41 +112,71 @@
def _get_fake_alt_identity(self):
return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
- def _get_result_url_from_fake_identity(self):
- return fake_identity.COMPUTE_ENDPOINTS_V2['endpoints'][1]['publicURL']
+ def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
+ replacement=None):
+ if replacement:
+ return ep[endpoint_type].replace('v2', replacement)
+ return ep[endpoint_type]
def _get_token_from_fake_identity(self):
return fake_identity.TOKEN
- def _test_request_helper(self):
+ def _test_request_helper(self, filters, expected):
+ url, headers, body = self.auth_provider.auth_request('GET',
+ self.target_url,
+ filters=filters)
+
+ self.assertEqual(expected['url'], url)
+ self.assertEqual(expected['token'], headers['X-Auth-Token'])
+ self.assertEqual(expected['body'], body)
+
+ def test_request(self):
+ filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'publicURL',
+ 'region': 'FakeRegion'
+ }
+
+ url = self._get_result_url_from_endpoint(
+ self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
+
+ expected = {
+ 'body': None,
+ 'url': url,
+ 'token': self._get_token_from_fake_identity(),
+ }
+ self._test_request_helper(filters, expected)
+
+ def test_request_with_alt_auth_cleans_alt(self):
+ self.auth_provider.set_alt_auth_data(
+ 'body',
+ (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
+ self.test_request()
+ # Assert alt auth data is clear after it
+ self.assertIsNone(self.auth_provider.alt_part)
+ self.assertIsNone(self.auth_provider.alt_auth_data)
+
+ def test_request_with_alt_part_without_alt_data(self):
+ """
+ Assert that when alt_part is defined, the corresponding original
+ request element is kept the same.
+ """
filters = {
'service': 'compute',
'endpoint_type': 'publicURL',
'region': 'fakeRegion'
}
+ self.auth_provider.set_alt_auth_data('url', None)
url, headers, body = self.auth_provider.auth_request('GET',
self.target_url,
filters=filters)
- result_url = self._get_result_url_from_fake_identity()
- self.assertEqual(url, result_url + '/' + self.target_url)
+ self.assertEqual(url, self.target_url)
self.assertEqual(self._get_token_from_fake_identity(),
headers['X-Auth-Token'])
self.assertEqual(body, None)
- def test_request(self):
- self._test_request_helper()
-
- def test_request_with_alt_auth(self):
- self.auth_provider.set_alt_auth_data(
- 'body',
- (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
- self._test_request_helper()
- # Assert alt auth data is clear after it
- self.assertIsNone(self.auth_provider.alt_part)
- self.assertIsNone(self.auth_provider.alt_auth_data)
-
def test_request_with_bad_service(self):
filters = {
'service': 'BAD_SERVICE',
@@ -154,7 +185,7 @@
}
self.assertRaises(exceptions.EndpointNotFound,
self.auth_provider.auth_request, 'GET',
- 'http://fakeurl.com/fake_api', filters=filters)
+ self.target_url, filters=filters)
def test_request_without_service(self):
filters = {
@@ -164,7 +195,7 @@
}
self.assertRaises(exceptions.EndpointNotFound,
self.auth_provider.auth_request, 'GET',
- 'http://fakeurl.com/fake_api', filters=filters)
+ self.target_url, filters=filters)
def test_check_credentials_missing_attribute(self):
for attr in ['username', 'password']:
@@ -183,8 +214,86 @@
del cred['tenant_name']
self.assertFalse(self.auth_provider.check_credentials(cred))
+ def _test_base_url_helper(self, expected_url, filters,
+ auth_data=None):
+
+ url = self.auth_provider.base_url(filters, auth_data)
+ self.assertEqual(url, expected_url)
+
+ def test_base_url(self):
+ self.filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'publicURL',
+ 'region': 'FakeRegion'
+ }
+ expected = self._get_result_url_from_endpoint(
+ self._endpoints[0]['endpoints'][1])
+ self._test_base_url_helper(expected, self.filters)
+
+ def test_base_url_to_get_admin_endpoint(self):
+ self.filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'adminURL',
+ 'region': 'FakeRegion'
+ }
+ expected = self._get_result_url_from_endpoint(
+ self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
+ self._test_base_url_helper(expected, self.filters)
+
+ def test_base_url_unknown_region(self):
+ """
+ Assure that if the region is unknow the first endpoint is returned.
+ """
+ self.filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'publicURL',
+ 'region': 'AintNoBodyKnowThisRegion'
+ }
+ expected = self._get_result_url_from_endpoint(
+ self._endpoints[0]['endpoints'][0])
+ self._test_base_url_helper(expected, self.filters)
+
+ def test_base_url_with_non_existent_service(self):
+ self.filters = {
+ 'service': 'BAD_SERVICE',
+ 'endpoint_type': 'publicURL',
+ 'region': 'FakeRegion'
+ }
+ self.assertRaises(exceptions.EndpointNotFound,
+ self._test_base_url_helper, None, self.filters)
+
+ def test_base_url_without_service(self):
+ self.filters = {
+ 'endpoint_type': 'publicURL',
+ 'region': 'FakeRegion'
+ }
+ self.assertRaises(exceptions.EndpointNotFound,
+ self._test_base_url_helper, None, self.filters)
+
+ def test_base_url_with_api_version_filter(self):
+ self.filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'publicURL',
+ 'region': 'FakeRegion',
+ 'api_version': 'v12'
+ }
+ expected = self._get_result_url_from_endpoint(
+ self._endpoints[0]['endpoints'][1], replacement='v12')
+ self._test_base_url_helper(expected, self.filters)
+
+ def test_base_url_with_skip_path_filter(self):
+ self.filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'publicURL',
+ 'region': 'FakeRegion',
+ 'skip_path': True
+ }
+ expected = 'http://fake_url/'
+ self._test_base_url_helper(expected, self.filters)
+
class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
+ _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
_auth_provider_class = auth.KeystoneV3AuthProvider
credentials = {
'username': 'fake_user',
@@ -201,10 +310,23 @@
def _get_fake_alt_identity(self):
return fake_identity.ALT_IDENTITY_V3['token']
- def _get_result_url_from_fake_identity(self):
- return fake_identity.COMPUTE_ENDPOINTS_V3['endpoints'][1]['url']
+ def _get_result_url_from_endpoint(self, ep, replacement=None):
+ if replacement:
+ return ep['url'].replace('v3', replacement)
+ return ep['url']
def test_check_credentials_missing_tenant_name(self):
cred = copy.copy(self.credentials)
del cred['domain_name']
self.assertFalse(self.auth_provider.check_credentials(cred))
+
+ # Overwrites v2 test
+ def test_base_url_to_get_admin_endpoint(self):
+ self.filters = {
+ 'service': 'compute',
+ 'endpoint_type': 'admin',
+ 'region': 'MiddleEarthRegion'
+ }
+ expected = self._get_result_url_from_endpoint(
+ self._endpoints[0]['endpoints'][2])
+ self._test_base_url_helper(expected, self.filters)