blob: b6e15bda3f6fb60691be0ea2f47554ddf29f8061 [file] [log] [blame]
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -05001# Copyright 2014 IBM Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import copy
17
18from tempest import auth
19from tempest.common import http
20from tempest import config
21from tempest import exceptions
22from tempest.openstack.common.fixture import mockpatch
23from tempest.tests import base
24from tempest.tests import fake_config
25from tempest.tests import fake_http
26from tempest.tests import fake_identity
27
28
29class BaseAuthTestsSetUp(base.TestCase):
30 _auth_provider_class = None
31 credentials = {
32 'username': 'fake_user',
33 'password': 'fake_pwd',
34 'tenant_name': 'fake_tenant'
35 }
36
37 def _auth(self, credentials, **params):
38 """
39 returns auth method according to keystone
40 """
41 return self._auth_provider_class(credentials, **params)
42
43 def setUp(self):
44 super(BaseAuthTestsSetUp, self).setUp()
Matthew Treinishff598482014-02-28 16:13:58 -050045 self.useFixture(fake_config.ConfigFixture())
46 self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050047 self.fake_http = fake_http.fake_httplib2(return_type=200)
48 self.stubs.Set(http.ClosingHttp, 'request', self.fake_http.request)
49 self.auth_provider = self._auth(self.credentials)
50
51
52class TestBaseAuthProvider(BaseAuthTestsSetUp):
53 """
54 This tests auth.AuthProvider class which is base for the other so we
55 obviously don't test not implemented method or the ones which strongly
56 depends on them.
57 """
58 _auth_provider_class = auth.AuthProvider
59
60 def test_check_credentials_is_dict(self):
61 self.assertTrue(self.auth_provider.check_credentials({}))
62
63 def test_check_credentials_bad_type(self):
64 self.assertFalse(self.auth_provider.check_credentials([]))
65
66 def test_instantiate_with_bad_credentials_type(self):
67 """
68 Assure that credentials with bad type fail with TypeError
69 """
70 self.assertRaises(TypeError, self._auth, [])
71
72 def test_auth_data_property(self):
73 self.assertRaises(NotImplementedError, getattr, self.auth_provider,
74 'auth_data')
75
76 def test_auth_data_property_when_cache_exists(self):
77 self.auth_provider.cache = 'foo'
78 self.useFixture(mockpatch.PatchObject(self.auth_provider,
79 'is_expired',
80 return_value=False))
81 self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
82
83 def test_delete_auth_data_property_through_deleter(self):
84 self.auth_provider.cache = 'foo'
85 del self.auth_provider.auth_data
86 self.assertIsNone(self.auth_provider.cache)
87
88 def test_delete_auth_data_property_through_clear_auth(self):
89 self.auth_provider.cache = 'foo'
90 self.auth_provider.clear_auth()
91 self.assertIsNone(self.auth_provider.cache)
92
93 def test_set_and_reset_alt_auth_data(self):
94 self.auth_provider.set_alt_auth_data('foo', 'bar')
95 self.assertEqual(self.auth_provider.alt_part, 'foo')
96 self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
97
98 self.auth_provider.reset_alt_auth_data()
99 self.assertIsNone(self.auth_provider.alt_part)
100 self.assertIsNone(self.auth_provider.alt_auth_data)
101
102
103class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500104 _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500105 _auth_provider_class = auth.KeystoneV2AuthProvider
106
107 def setUp(self):
108 super(TestKeystoneV2AuthProvider, self).setUp()
109 self.stubs.Set(http.ClosingHttp, 'request',
110 fake_identity._fake_v2_response)
111 self.target_url = 'test_api'
112
113 def _get_fake_alt_identity(self):
114 return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
115
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500116 def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
117 replacement=None):
118 if replacement:
119 return ep[endpoint_type].replace('v2', replacement)
120 return ep[endpoint_type]
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500121
122 def _get_token_from_fake_identity(self):
123 return fake_identity.TOKEN
124
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500125 def _test_request_helper(self, filters, expected):
126 url, headers, body = self.auth_provider.auth_request('GET',
127 self.target_url,
128 filters=filters)
129
130 self.assertEqual(expected['url'], url)
131 self.assertEqual(expected['token'], headers['X-Auth-Token'])
132 self.assertEqual(expected['body'], body)
133
134 def test_request(self):
135 filters = {
136 'service': 'compute',
137 'endpoint_type': 'publicURL',
138 'region': 'FakeRegion'
139 }
140
141 url = self._get_result_url_from_endpoint(
142 self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
143
144 expected = {
145 'body': None,
146 'url': url,
147 'token': self._get_token_from_fake_identity(),
148 }
149 self._test_request_helper(filters, expected)
150
151 def test_request_with_alt_auth_cleans_alt(self):
152 self.auth_provider.set_alt_auth_data(
153 'body',
154 (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
155 self.test_request()
156 # Assert alt auth data is clear after it
157 self.assertIsNone(self.auth_provider.alt_part)
158 self.assertIsNone(self.auth_provider.alt_auth_data)
159
160 def test_request_with_alt_part_without_alt_data(self):
161 """
162 Assert that when alt_part is defined, the corresponding original
163 request element is kept the same.
164 """
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500165 filters = {
166 'service': 'compute',
167 'endpoint_type': 'publicURL',
168 'region': 'fakeRegion'
169 }
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500170 self.auth_provider.set_alt_auth_data('url', None)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500171
172 url, headers, body = self.auth_provider.auth_request('GET',
173 self.target_url,
174 filters=filters)
175
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500176 self.assertEqual(url, self.target_url)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500177 self.assertEqual(self._get_token_from_fake_identity(),
178 headers['X-Auth-Token'])
179 self.assertEqual(body, None)
180
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500181 def test_request_with_bad_service(self):
182 filters = {
183 'service': 'BAD_SERVICE',
184 'endpoint_type': 'publicURL',
185 'region': 'fakeRegion'
186 }
187 self.assertRaises(exceptions.EndpointNotFound,
188 self.auth_provider.auth_request, 'GET',
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500189 self.target_url, filters=filters)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500190
191 def test_request_without_service(self):
192 filters = {
193 'service': None,
194 'endpoint_type': 'publicURL',
195 'region': 'fakeRegion'
196 }
197 self.assertRaises(exceptions.EndpointNotFound,
198 self.auth_provider.auth_request, 'GET',
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500199 self.target_url, filters=filters)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500200
201 def test_check_credentials_missing_attribute(self):
202 for attr in ['username', 'password']:
203 cred = copy.copy(self.credentials)
204 del cred[attr]
205 self.assertFalse(self.auth_provider.check_credentials(cred))
206
207 def test_check_credentials_not_scoped_missing_tenant_name(self):
208 cred = copy.copy(self.credentials)
209 del cred['tenant_name']
210 self.assertTrue(self.auth_provider.check_credentials(cred,
211 scoped=False))
212
213 def test_check_credentials_missing_tenant_name(self):
214 cred = copy.copy(self.credentials)
215 del cred['tenant_name']
216 self.assertFalse(self.auth_provider.check_credentials(cred))
217
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500218 def _test_base_url_helper(self, expected_url, filters,
219 auth_data=None):
220
221 url = self.auth_provider.base_url(filters, auth_data)
222 self.assertEqual(url, expected_url)
223
224 def test_base_url(self):
225 self.filters = {
226 'service': 'compute',
227 'endpoint_type': 'publicURL',
228 'region': 'FakeRegion'
229 }
230 expected = self._get_result_url_from_endpoint(
231 self._endpoints[0]['endpoints'][1])
232 self._test_base_url_helper(expected, self.filters)
233
234 def test_base_url_to_get_admin_endpoint(self):
235 self.filters = {
236 'service': 'compute',
237 'endpoint_type': 'adminURL',
238 'region': 'FakeRegion'
239 }
240 expected = self._get_result_url_from_endpoint(
241 self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
242 self._test_base_url_helper(expected, self.filters)
243
244 def test_base_url_unknown_region(self):
245 """
246 Assure that if the region is unknow the first endpoint is returned.
247 """
248 self.filters = {
249 'service': 'compute',
250 'endpoint_type': 'publicURL',
251 'region': 'AintNoBodyKnowThisRegion'
252 }
253 expected = self._get_result_url_from_endpoint(
254 self._endpoints[0]['endpoints'][0])
255 self._test_base_url_helper(expected, self.filters)
256
257 def test_base_url_with_non_existent_service(self):
258 self.filters = {
259 'service': 'BAD_SERVICE',
260 'endpoint_type': 'publicURL',
261 'region': 'FakeRegion'
262 }
263 self.assertRaises(exceptions.EndpointNotFound,
264 self._test_base_url_helper, None, self.filters)
265
266 def test_base_url_without_service(self):
267 self.filters = {
268 'endpoint_type': 'publicURL',
269 'region': 'FakeRegion'
270 }
271 self.assertRaises(exceptions.EndpointNotFound,
272 self._test_base_url_helper, None, self.filters)
273
274 def test_base_url_with_api_version_filter(self):
275 self.filters = {
276 'service': 'compute',
277 'endpoint_type': 'publicURL',
278 'region': 'FakeRegion',
279 'api_version': 'v12'
280 }
281 expected = self._get_result_url_from_endpoint(
282 self._endpoints[0]['endpoints'][1], replacement='v12')
283 self._test_base_url_helper(expected, self.filters)
284
285 def test_base_url_with_skip_path_filter(self):
286 self.filters = {
287 'service': 'compute',
288 'endpoint_type': 'publicURL',
289 'region': 'FakeRegion',
290 'skip_path': True
291 }
292 expected = 'http://fake_url/'
293 self._test_base_url_helper(expected, self.filters)
294
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500295
296class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500297 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500298 _auth_provider_class = auth.KeystoneV3AuthProvider
299 credentials = {
300 'username': 'fake_user',
301 'password': 'fake_pwd',
302 'tenant_name': 'fake_tenant',
303 'domain_name': 'fake_domain_name',
304 }
305
306 def setUp(self):
307 super(TestKeystoneV3AuthProvider, self).setUp()
308 self.stubs.Set(http.ClosingHttp, 'request',
309 fake_identity._fake_v3_response)
310
311 def _get_fake_alt_identity(self):
312 return fake_identity.ALT_IDENTITY_V3['token']
313
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500314 def _get_result_url_from_endpoint(self, ep, replacement=None):
315 if replacement:
316 return ep['url'].replace('v3', replacement)
317 return ep['url']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500318
319 def test_check_credentials_missing_tenant_name(self):
320 cred = copy.copy(self.credentials)
321 del cred['domain_name']
322 self.assertFalse(self.auth_provider.check_credentials(cred))
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500323
324 # Overwrites v2 test
325 def test_base_url_to_get_admin_endpoint(self):
326 self.filters = {
327 'service': 'compute',
328 'endpoint_type': 'admin',
329 'region': 'MiddleEarthRegion'
330 }
331 expected = self._get_result_url_from_endpoint(
332 self._endpoints[0]['endpoints'][2])
333 self._test_base_url_helper(expected, self.filters)