blob: a999cfc1c8d680de134e69917400497ee59b1666 [file] [log] [blame]
Maru Newbyb096d9f2015-03-09 18:54:54 +00001# Copyright (c) 2014 Deutsche Telekom AG
2# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import abc
16
17import six
18
19from neutron.tests.tempest import auth
20from neutron.tests.tempest import config
21from neutron.tests.tempest import exceptions
22from neutron.openstack.common import log as logging
23
24CONF = config.CONF
25LOG = logging.getLogger(__name__)
26
27# Type of credentials available from configuration
28CREDENTIAL_TYPES = {
29 'identity_admin': ('identity', 'admin'),
30 'user': ('identity', None),
31 'alt_user': ('identity', 'alt')
32}
33
34
35# Read credentials from configuration, builds a Credentials object
36# based on the specified or configured version
37def get_configured_credentials(credential_type, fill_in=True,
38 identity_version=None):
39 identity_version = identity_version or CONF.identity.auth_version
40 if identity_version not in ('v2', 'v3'):
41 raise exceptions.InvalidConfiguration(
42 'Unsupported auth version: %s' % identity_version)
43 if credential_type not in CREDENTIAL_TYPES:
44 raise exceptions.InvalidCredentials()
45 conf_attributes = ['username', 'password', 'tenant_name']
46 if identity_version == 'v3':
47 conf_attributes.append('domain_name')
48 # Read the parts of credentials from config
49 params = {}
50 section, prefix = CREDENTIAL_TYPES[credential_type]
51 for attr in conf_attributes:
52 _section = getattr(CONF, section)
53 if prefix is None:
54 params[attr] = getattr(_section, attr)
55 else:
56 params[attr] = getattr(_section, prefix + "_" + attr)
57 # Build and validate credentials. We are reading configured credentials,
58 # so validate them even if fill_in is False
59 credentials = get_credentials(fill_in=fill_in, **params)
60 if not fill_in:
61 if not credentials.is_valid():
62 msg = ("The %s credentials are incorrectly set in the config file."
63 " Double check that all required values are assigned" %
64 credential_type)
65 raise exceptions.InvalidConfiguration(msg)
66 return credentials
67
68
69# Wrapper around auth.get_credentials to use the configured identity version
70# is none is specified
71def get_credentials(fill_in=True, identity_version=None, **kwargs):
72 identity_version = identity_version or CONF.identity.auth_version
73 # In case of "v3" add the domain from config if not specified
74 if identity_version == 'v3':
75 domain_fields = set(x for x in auth.KeystoneV3Credentials.ATTRIBUTES
76 if 'domain' in x)
77 if not domain_fields.intersection(kwargs.keys()):
78 kwargs['user_domain_name'] = CONF.identity.admin_domain_name
79 auth_url = CONF.identity.uri_v3
80 else:
81 auth_url = CONF.identity.uri
82 return auth.get_credentials(auth_url,
83 fill_in=fill_in,
84 identity_version=identity_version,
85 **kwargs)
86
87
88@six.add_metaclass(abc.ABCMeta)
89class CredentialProvider(object):
90 def __init__(self, name, password='pass', network_resources=None):
91 self.name = name
92
93 @abc.abstractmethod
94 def get_primary_creds(self):
95 return
96
97 @abc.abstractmethod
98 def get_admin_creds(self):
99 return
100
101 @abc.abstractmethod
102 def get_alt_creds(self):
103 return
104
105 @abc.abstractmethod
106 def clear_isolated_creds(self):
107 return
108
109 @abc.abstractmethod
110 def is_multi_user(self):
111 return
112
113 @abc.abstractmethod
114 def is_multi_tenant(self):
115 return
116
117 @abc.abstractmethod
118 def get_creds_by_roles(self, roles, force_new=False):
119 return
120
121 @abc.abstractmethod
122 def is_role_available(self, role):
123 return