blob: 48d5b4c0b357875d43ef2e164c6ff045b6c14710 [file] [log] [blame]
DavidPurcellb25f93d2017-01-27 12:46:27 -05001# Copyright 2017 AT&T Corporation.
DavidPurcell029d8c32017-01-06 15:27:41 -05002# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import json
DavidPurcell029d8c32017-01-06 15:27:41 -050017import six
18import time
19import urllib3
20
Rajiv Kumar645dfc92017-01-19 13:48:27 +053021from oslo_log import log as logging
DavidPurcell029d8c32017-01-06 15:27:41 -050022from tempest import config
23
24from patrole_tempest_plugin import rbac_exceptions as rbac_exc
25
26LOG = logging.getLogger(__name__)
27CONF = config.CONF
28http = urllib3.PoolManager()
29
30
31class Singleton(type):
32 _instances = {}
33
34 def __call__(cls, *args, **kwargs):
35 if cls not in cls._instances:
36 cls._instances[cls] = super(Singleton, cls).__call__(*args,
37 **kwargs)
38 return cls._instances[cls]
39
40
41@six.add_metaclass(Singleton)
42class RbacUtils(object):
43 def __init__(self):
44 RbacUtils.dictionary = {}
45
46 @staticmethod
47 def get_roles(caller):
48 admin_role_id = None
49 rbac_role_id = None
50
51 if bool(RbacUtils.dictionary) is False:
52 admin_token = caller.admin_client.token
53 headers = {'X-Auth-Token': admin_token,
54 "Content-Type": "application/json"}
55 url_to_get_role = CONF.identity.uri_v3 + '/roles/'
56 response = http.request('GET', url_to_get_role, headers=headers)
57 if response.status != 200:
58 raise rbac_exc.RbacResourceSetupFailed('Unable to'
59 ' retrieve roles')
60 data = response.data
61 roles = json.loads(data)
62 for item in roles['roles']:
63 if item['name'] == CONF.rbac.rbac_test_role:
64 rbac_role_id = item['id']
65 if item['name'] == 'admin':
66 admin_role_id = item['id']
67
68 RbacUtils.dictionary.update({'admin_role_id': admin_role_id,
69 'rbac_role_id': rbac_role_id})
70
71 return RbacUtils.dictionary
72
73 @staticmethod
74 def delete_all_roles(self, base_url, headers):
75 # Find the current role
76 response = http.request('GET', base_url, headers=headers)
77 if response.status != 200:
78 raise rbac_exc.RbacResourceSetupFailed('Unable to retrieve'
79 ' user role')
80 data = response.data
81 roles = json.loads(data)
82 for item in roles['roles']:
83 url = base_url + item['id']
84 response = http.request('DELETE', url, headers=headers)
85 self.assertEqual(204, response.status)
86
87 @staticmethod
88 def switch_role(self, switchToRbacRole=None):
89 LOG.debug('Switching role to: %s', switchToRbacRole)
90 if switchToRbacRole is None:
91 return
92
93 roles = rbac_utils.get_roles(self)
94 rbac_role_id = roles.get('rbac_role_id')
95 admin_role_id = roles.get('admin_role_id')
96
97 try:
98 user_id = self.auth_provider.credentials.user_id
99 project_id = self.auth_provider.credentials.tenant_id
100 admin_token = self.admin_client.token
101
102 headers = {'X-Auth-Token': admin_token,
103 "Content-Type": "application/json"}
104 base_url = (CONF.identity.uri_v3 + '/projects/' + project_id +
105 '/users/' + user_id + '/roles/')
106
107 rbac_utils.delete_all_roles(self, base_url, headers)
108
109 if switchToRbacRole:
110 url = base_url + rbac_role_id
111 response = http.request('PUT', url, headers=headers)
112 self.assertEqual(204, response.status)
113 else:
114 url = base_url + admin_role_id
115 response = http.request('PUT', url, headers=headers)
116 self.assertEqual(204, response.status)
117
118 except Exception as exp:
119 LOG.error(exp)
120 raise
121 finally:
122 self.auth_provider.clear_auth()
123 # Sleep to avoid 401 errors caused by rounding
124 # In timing of fernet token creation
125 time.sleep(1)
126 self.auth_provider.set_auth()
127
128rbac_utils = RbacUtils()