Renew token before expiry time
Modify is_expired so that token is renewed before it
actually expires. The threshold is set to 60s, which is plenty
of time for any API call to complete.
Use utcnow() instead of utc(). Includes unit tests for is_expired.
Closes bug 1292145
Change-Id: Ic062b4352612c936d1f83fcb7cb09154eb628d9e
diff --git a/tempest/auth.py b/tempest/auth.py
index 0e45161..5fc923f 100644
--- a/tempest/auth.py
+++ b/tempest/auth.py
@@ -164,6 +164,8 @@
class KeystoneAuthProvider(AuthProvider):
+ token_expiry_threshold = datetime.timedelta(seconds=60)
+
def __init__(self, credentials, client_type='tempest', interface=None):
super(KeystoneAuthProvider, self).__init__(credentials, client_type,
interface)
@@ -293,7 +295,8 @@
_, access = auth_data
expiry = datetime.datetime.strptime(access['token']['expires'],
self.EXPIRY_DATE_FORMAT)
- return expiry <= datetime.datetime.now()
+ return expiry - self.token_expiry_threshold <= \
+ datetime.datetime.utcnow()
class KeystoneV3AuthProvider(KeystoneAuthProvider):
@@ -393,4 +396,5 @@
_, access = auth_data
expiry = datetime.datetime.strptime(access['expires_at'],
self.EXPIRY_DATE_FORMAT)
- return expiry <= datetime.datetime.now()
+ return expiry - self.token_expiry_threshold <= \
+ datetime.datetime.utcnow()
diff --git a/tempest/tests/test_auth.py b/tempest/tests/test_auth.py
index b6e15bd..62c20e3 100644
--- a/tempest/tests/test_auth.py
+++ b/tempest/tests/test_auth.py
@@ -14,6 +14,7 @@
# under the License.
import copy
+import datetime
from tempest import auth
from tempest.common import http
@@ -131,6 +132,11 @@
self.assertEqual(expected['token'], headers['X-Auth-Token'])
self.assertEqual(expected['body'], body)
+ def _auth_data_with_expiry(self, date_as_string):
+ token, access = self.auth_provider.auth_data
+ access['token']['expires'] = date_as_string
+ return token, access
+
def test_request(self):
filters = {
'service': 'compute',
@@ -292,6 +298,25 @@
expected = 'http://fake_url/'
self._test_base_url_helper(expected, self.filters)
+ def test_token_not_expired(self):
+ expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
+ auth_data = self._auth_data_with_expiry(
+ expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
+ self.assertFalse(self.auth_provider.is_expired(auth_data))
+
+ def test_token_expired(self):
+ expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
+ auth_data = self._auth_data_with_expiry(
+ expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
+ self.assertTrue(self.auth_provider.is_expired(auth_data))
+
+ def test_token_not_expired_to_be_renewed(self):
+ expiry_data = datetime.datetime.utcnow() + \
+ self.auth_provider.token_expiry_threshold / 2
+ auth_data = self._auth_data_with_expiry(
+ expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
+ self.assertTrue(self.auth_provider.is_expired(auth_data))
+
class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
_endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
@@ -316,6 +341,11 @@
return ep['url'].replace('v3', replacement)
return ep['url']
+ def _auth_data_with_expiry(self, date_as_string):
+ token, access = self.auth_provider.auth_data
+ access['expires_at'] = date_as_string
+ return token, access
+
def test_check_credentials_missing_tenant_name(self):
cred = copy.copy(self.credentials)
del cred['domain_name']