blob: ebcfe828832fec3c3dc30774ebe0fd21a4a388c5 [file] [log] [blame]
Matthew Treinish9e26ca82016-02-23 11:43:20 -05001# Copyright 2014 IBM Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import copy
17import datetime
18
19from oslotest import mockpatch
20
21from tempest.lib import auth
22from tempest.lib import exceptions
23from tempest.lib.services.identity.v2 import token_client as v2_client
24from tempest.lib.services.identity.v3 import token_client as v3_client
25from tempest.tests.lib import base
26from tempest.tests.lib import fake_credentials
27from tempest.tests.lib import fake_http
28from tempest.tests.lib import fake_identity
29
30
31def fake_get_credentials(fill_in=True, identity_version='v2', **kwargs):
32 return fake_credentials.FakeCredentials()
33
34
35class BaseAuthTestsSetUp(base.TestCase):
36 _auth_provider_class = None
37 credentials = fake_credentials.FakeCredentials()
38
39 def _auth(self, credentials, auth_url, **params):
40 """returns auth method according to keystone"""
41 return self._auth_provider_class(credentials, auth_url, **params)
42
43 def setUp(self):
44 super(BaseAuthTestsSetUp, self).setUp()
45 self.fake_http = fake_http.fake_httplib2(return_type=200)
46 self.stubs.Set(auth, 'get_credentials', fake_get_credentials)
47 self.auth_provider = self._auth(self.credentials,
48 fake_identity.FAKE_AUTH_URL)
49
50
51class TestBaseAuthProvider(BaseAuthTestsSetUp):
52 """Tests for base AuthProvider
53
54 This tests auth.AuthProvider class which is base for the other so we
55 obviously don't test not implemented method or the ones which strongly
56 depends on them.
57 """
58
59 class FakeAuthProviderImpl(auth.AuthProvider):
60 def _decorate_request(self):
61 pass
62
63 def _fill_credentials(self):
64 pass
65
66 def _get_auth(self):
67 pass
68
69 def base_url(self):
70 pass
71
72 def is_expired(self):
73 pass
74
75 _auth_provider_class = FakeAuthProviderImpl
76
77 def _auth(self, credentials, auth_url, **params):
78 """returns auth method according to keystone"""
79 return self._auth_provider_class(credentials, **params)
80
81 def test_check_credentials_bad_type(self):
82 self.assertFalse(self.auth_provider.check_credentials([]))
83
84 def test_auth_data_property_when_cache_exists(self):
85 self.auth_provider.cache = 'foo'
86 self.useFixture(mockpatch.PatchObject(self.auth_provider,
87 'is_expired',
88 return_value=False))
89 self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
90
91 def test_delete_auth_data_property_through_deleter(self):
92 self.auth_provider.cache = 'foo'
93 del self.auth_provider.auth_data
94 self.assertIsNone(self.auth_provider.cache)
95
96 def test_delete_auth_data_property_through_clear_auth(self):
97 self.auth_provider.cache = 'foo'
98 self.auth_provider.clear_auth()
99 self.assertIsNone(self.auth_provider.cache)
100
101 def test_set_and_reset_alt_auth_data(self):
102 self.auth_provider.set_alt_auth_data('foo', 'bar')
103 self.assertEqual(self.auth_provider.alt_part, 'foo')
104 self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
105
106 self.auth_provider.reset_alt_auth_data()
107 self.assertIsNone(self.auth_provider.alt_part)
108 self.assertIsNone(self.auth_provider.alt_auth_data)
109
110 def test_auth_class(self):
111 self.assertRaises(TypeError,
112 auth.AuthProvider,
113 fake_credentials.FakeCredentials)
114
115
116class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
117 _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
118 _auth_provider_class = auth.KeystoneV2AuthProvider
119 credentials = fake_credentials.FakeKeystoneV2Credentials()
120
121 def setUp(self):
122 super(TestKeystoneV2AuthProvider, self).setUp()
123 self.stubs.Set(v2_client.TokenClient, 'raw_request',
124 fake_identity._fake_v2_response)
125 self.target_url = 'test_api'
126
127 def _get_fake_identity(self):
128 return fake_identity.IDENTITY_V2_RESPONSE['access']
129
130 def _get_fake_alt_identity(self):
131 return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
132
133 def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
134 replacement=None):
135 if replacement:
136 return ep[endpoint_type].replace('v2', replacement)
137 return ep[endpoint_type]
138
139 def _get_token_from_fake_identity(self):
140 return fake_identity.TOKEN
141
142 def _get_from_fake_identity(self, attr):
143 access = fake_identity.IDENTITY_V2_RESPONSE['access']
144 if attr == 'user_id':
145 return access['user']['id']
146 elif attr == 'tenant_id':
147 return access['token']['tenant']['id']
148
149 def _test_request_helper(self, filters, expected):
150 url, headers, body = self.auth_provider.auth_request('GET',
151 self.target_url,
152 filters=filters)
153
154 self.assertEqual(expected['url'], url)
155 self.assertEqual(expected['token'], headers['X-Auth-Token'])
156 self.assertEqual(expected['body'], body)
157
158 def _auth_data_with_expiry(self, date_as_string):
159 token, access = self.auth_provider.auth_data
160 access['token']['expires'] = date_as_string
161 return token, access
162
163 def test_request(self):
164 filters = {
165 'service': 'compute',
166 'endpoint_type': 'publicURL',
167 'region': 'FakeRegion'
168 }
169
170 url = self._get_result_url_from_endpoint(
171 self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
172
173 expected = {
174 'body': None,
175 'url': url,
176 'token': self._get_token_from_fake_identity(),
177 }
178 self._test_request_helper(filters, expected)
179
180 def test_request_with_alt_auth_cleans_alt(self):
181 """Test alternate auth data for headers
182
183 Assert that when the alt data is provided for headers, after an
184 auth_request the data alt_data is cleaned-up.
185 """
186 self.auth_provider.set_alt_auth_data(
187 'headers',
188 (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
189 filters = {
190 'service': 'compute',
191 'endpoint_type': 'publicURL',
192 'region': 'fakeRegion'
193 }
194 self.auth_provider.auth_request('GET', self.target_url,
195 filters=filters)
196
197 # Assert alt auth data is clear after it
198 self.assertIsNone(self.auth_provider.alt_part)
199 self.assertIsNone(self.auth_provider.alt_auth_data)
200
201 def _test_request_with_identical_alt_auth(self, part):
202 """Test alternate but identical auth data for headers
203
204 Assert that when the alt data is provided, but it's actually
205 identical, an exception is raised.
206 """
207 self.auth_provider.set_alt_auth_data(
208 part,
209 (fake_identity.TOKEN, self._get_fake_identity()))
210 filters = {
211 'service': 'compute',
212 'endpoint_type': 'publicURL',
213 'region': 'fakeRegion'
214 }
215
216 self.assertRaises(exceptions.BadAltAuth,
217 self.auth_provider.auth_request,
218 'GET', self.target_url, filters=filters)
219
220 def test_request_with_identical_alt_auth_headers(self):
221 self._test_request_with_identical_alt_auth('headers')
222
223 def test_request_with_identical_alt_auth_url(self):
224 self._test_request_with_identical_alt_auth('url')
225
226 def test_request_with_identical_alt_auth_body(self):
227 self._test_request_with_identical_alt_auth('body')
228
229 def test_request_with_alt_part_without_alt_data(self):
230 """Test empty alternate auth data
231
232 Assert that when alt_part is defined, the corresponding original
233 request element is kept the same.
234 """
235 filters = {
236 'service': 'compute',
237 'endpoint_type': 'publicURL',
238 'region': 'fakeRegion'
239 }
240 self.auth_provider.set_alt_auth_data('headers', None)
241
242 url, headers, body = self.auth_provider.auth_request('GET',
243 self.target_url,
244 filters=filters)
245 # The original headers where empty
246 self.assertNotEqual(url, self.target_url)
247 self.assertIsNone(headers)
248 self.assertEqual(body, None)
249
250 def _test_request_with_alt_part_without_alt_data_no_change(self, body):
251 """Test empty alternate auth data with no effect
252
253 Assert that when alt_part is defined, no auth_data is provided,
254 and the the corresponding original request element was not going to
255 be changed anyways, and exception is raised
256 """
257 filters = {
258 'service': 'compute',
259 'endpoint_type': 'publicURL',
260 'region': 'fakeRegion'
261 }
262 self.auth_provider.set_alt_auth_data('body', None)
263
264 self.assertRaises(exceptions.BadAltAuth,
265 self.auth_provider.auth_request,
266 'GET', self.target_url, filters=filters)
267
268 def test_request_with_alt_part_without_alt_data_no_change_headers(self):
269 self._test_request_with_alt_part_without_alt_data_no_change('headers')
270
271 def test_request_with_alt_part_without_alt_data_no_change_url(self):
272 self._test_request_with_alt_part_without_alt_data_no_change('url')
273
274 def test_request_with_alt_part_without_alt_data_no_change_body(self):
275 self._test_request_with_alt_part_without_alt_data_no_change('body')
276
277 def test_request_with_bad_service(self):
278 filters = {
279 'service': 'BAD_SERVICE',
280 'endpoint_type': 'publicURL',
281 'region': 'fakeRegion'
282 }
283 self.assertRaises(exceptions.EndpointNotFound,
284 self.auth_provider.auth_request, 'GET',
285 self.target_url, filters=filters)
286
287 def test_request_without_service(self):
288 filters = {
289 'service': None,
290 'endpoint_type': 'publicURL',
291 'region': 'fakeRegion'
292 }
293 self.assertRaises(exceptions.EndpointNotFound,
294 self.auth_provider.auth_request, 'GET',
295 self.target_url, filters=filters)
296
297 def test_check_credentials_missing_attribute(self):
298 for attr in ['username', 'password']:
299 cred = copy.copy(self.credentials)
300 del cred[attr]
301 self.assertFalse(self.auth_provider.check_credentials(cred))
302
303 def test_fill_credentials(self):
304 self.auth_provider.fill_credentials()
305 creds = self.auth_provider.credentials
306 for attr in ['user_id', 'tenant_id']:
307 self.assertEqual(self._get_from_fake_identity(attr),
308 getattr(creds, attr))
309
310 def _test_base_url_helper(self, expected_url, filters,
311 auth_data=None):
312
313 url = self.auth_provider.base_url(filters, auth_data)
314 self.assertEqual(url, expected_url)
315
316 def test_base_url(self):
317 self.filters = {
318 'service': 'compute',
319 'endpoint_type': 'publicURL',
320 'region': 'FakeRegion'
321 }
322 expected = self._get_result_url_from_endpoint(
323 self._endpoints[0]['endpoints'][1])
324 self._test_base_url_helper(expected, self.filters)
325
326 def test_base_url_to_get_admin_endpoint(self):
327 self.filters = {
328 'service': 'compute',
329 'endpoint_type': 'adminURL',
330 'region': 'FakeRegion'
331 }
332 expected = self._get_result_url_from_endpoint(
333 self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
334 self._test_base_url_helper(expected, self.filters)
335
336 def test_base_url_unknown_region(self):
337 """If the region is unknown, the first endpoint is returned."""
338 self.filters = {
339 'service': 'compute',
340 'endpoint_type': 'publicURL',
341 'region': 'AintNoBodyKnowThisRegion'
342 }
343 expected = self._get_result_url_from_endpoint(
344 self._endpoints[0]['endpoints'][0])
345 self._test_base_url_helper(expected, self.filters)
346
347 def test_base_url_with_non_existent_service(self):
348 self.filters = {
349 'service': 'BAD_SERVICE',
350 'endpoint_type': 'publicURL',
351 'region': 'FakeRegion'
352 }
353 self.assertRaises(exceptions.EndpointNotFound,
354 self._test_base_url_helper, None, self.filters)
355
356 def test_base_url_without_service(self):
357 self.filters = {
358 'endpoint_type': 'publicURL',
359 'region': 'FakeRegion'
360 }
361 self.assertRaises(exceptions.EndpointNotFound,
362 self._test_base_url_helper, None, self.filters)
363
364 def test_base_url_with_api_version_filter(self):
365 self.filters = {
366 'service': 'compute',
367 'endpoint_type': 'publicURL',
368 'region': 'FakeRegion',
369 'api_version': 'v12'
370 }
371 expected = self._get_result_url_from_endpoint(
372 self._endpoints[0]['endpoints'][1], replacement='v12')
373 self._test_base_url_helper(expected, self.filters)
374
375 def test_base_url_with_skip_path_filter(self):
376 self.filters = {
377 'service': 'compute',
378 'endpoint_type': 'publicURL',
379 'region': 'FakeRegion',
380 'skip_path': True
381 }
382 expected = 'http://fake_url/'
383 self._test_base_url_helper(expected, self.filters)
384
Jamie Lennoxa934a702016-03-09 11:36:36 +1100385 def test_base_url_with_unversioned_endpoint(self):
386 auth_data = {
387 'serviceCatalog': [
388 {
389 'type': 'identity',
390 'endpoints': [
391 {
392 'region': 'FakeRegion',
393 'publicURL': 'http://fake_url'
394 }
395 ]
396 }
397 ]
398 }
399
400 filters = {
401 'service': 'identity',
402 'endpoint_type': 'publicURL',
403 'region': 'FakeRegion',
404 'api_version': 'v2.0'
405 }
406
407 expected = 'http://fake_url/v2.0'
408 self._test_base_url_helper(expected, filters, ('token', auth_data))
409
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500410 def test_token_not_expired(self):
411 expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
412 self._verify_expiry(expiry_data=expiry_data, should_be_expired=False)
413
414 def test_token_expired(self):
415 expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
416 self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
417
418 def test_token_not_expired_to_be_renewed(self):
419 expiry_data = (datetime.datetime.utcnow() +
420 self.auth_provider.token_expiry_threshold / 2)
421 self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
422
423 def _verify_expiry(self, expiry_data, should_be_expired):
424 for expiry_format in self.auth_provider.EXPIRY_DATE_FORMATS:
425 auth_data = self._auth_data_with_expiry(
426 expiry_data.strftime(expiry_format))
427 self.assertEqual(self.auth_provider.is_expired(auth_data),
428 should_be_expired)
429
430
431class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
432 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
433 _auth_provider_class = auth.KeystoneV3AuthProvider
434 credentials = fake_credentials.FakeKeystoneV3Credentials()
435
436 def setUp(self):
437 super(TestKeystoneV3AuthProvider, self).setUp()
438 self.stubs.Set(v3_client.V3TokenClient, 'raw_request',
439 fake_identity._fake_v3_response)
440
441 def _get_fake_identity(self):
442 return fake_identity.IDENTITY_V3_RESPONSE['token']
443
444 def _get_fake_alt_identity(self):
445 return fake_identity.ALT_IDENTITY_V3['token']
446
447 def _get_result_url_from_endpoint(self, ep, replacement=None):
448 if replacement:
449 return ep['url'].replace('v3', replacement)
450 return ep['url']
451
452 def _auth_data_with_expiry(self, date_as_string):
453 token, access = self.auth_provider.auth_data
454 access['expires_at'] = date_as_string
455 return token, access
456
457 def _get_from_fake_identity(self, attr):
458 token = fake_identity.IDENTITY_V3_RESPONSE['token']
459 if attr == 'user_id':
460 return token['user']['id']
461 elif attr == 'project_id':
462 return token['project']['id']
463 elif attr == 'user_domain_id':
464 return token['user']['domain']['id']
465 elif attr == 'project_domain_id':
466 return token['project']['domain']['id']
467
468 def test_check_credentials_missing_attribute(self):
469 # reset credentials to fresh ones
470 self.credentials.reset()
471 for attr in ['username', 'password', 'user_domain_name',
472 'project_domain_name']:
473 cred = copy.copy(self.credentials)
474 del cred[attr]
475 self.assertFalse(self.auth_provider.check_credentials(cred),
476 "Credentials should be invalid without %s" % attr)
477
478 def test_check_domain_credentials_missing_attribute(self):
479 # reset credentials to fresh ones
480 self.credentials.reset()
481 domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
482 for attr in ['username', 'password', 'user_domain_name']:
483 cred = copy.copy(domain_creds)
484 del cred[attr]
485 self.assertFalse(self.auth_provider.check_credentials(cred),
486 "Credentials should be invalid without %s" % attr)
487
488 def test_fill_credentials(self):
489 self.auth_provider.fill_credentials()
490 creds = self.auth_provider.credentials
491 for attr in ['user_id', 'project_id', 'user_domain_id',
492 'project_domain_id']:
493 self.assertEqual(self._get_from_fake_identity(attr),
494 getattr(creds, attr))
495
496 # Overwrites v2 test
497 def test_base_url_to_get_admin_endpoint(self):
498 self.filters = {
499 'service': 'compute',
500 'endpoint_type': 'admin',
501 'region': 'MiddleEarthRegion'
502 }
503 expected = self._get_result_url_from_endpoint(
504 self._endpoints[0]['endpoints'][2])
505 self._test_base_url_helper(expected, self.filters)
Jamie Lennoxa934a702016-03-09 11:36:36 +1100506
507 # Overwrites v2 test
508 def test_base_url_with_unversioned_endpoint(self):
509 auth_data = {
510 'catalog': [
511 {
512 'type': 'identity',
513 'endpoints': [
514 {
515 'region': 'FakeRegion',
516 'url': 'http://fake_url',
517 'interface': 'public'
518 }
519 ]
520 }
521 ]
522 }
523
524 filters = {
525 'service': 'identity',
526 'endpoint_type': 'publicURL',
527 'region': 'FakeRegion',
528 'api_version': 'v3'
529 }
530
531 expected = 'http://fake_url/v3'
532 self._test_base_url_helper(expected, filters, ('token', auth_data))