blob: 50c3495511c4215548c2144f17c657f9216518dc [file] [log] [blame]
Ade Lee47a5e982022-11-15 16:34:30 +00001# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13import requests
14
15
16class KeycloakClient(object):
17 def __init__(self, keycloak_url, keycloak_username, keycloak_password,
18 realm='master', ca_certs_file=False):
19 self.keycloak_url = keycloak_url
20 self.keycloak_username = keycloak_username
21 self.keycloak_password = keycloak_password
22 self.session = requests.session()
23 self.realm = realm
24 self.ca_certs_file = ca_certs_file
25 self._admin_auth()
26
27 @property
28 def url_base(self):
29 return self.keycloak_url + f'/admin/realms'
30
31 @property
32 def token_endpoint(self):
33 return self.keycloak_url + \
34 f'/realms/{self.realm}/protocol/openid-connect/token'
35
36 @property
37 def discovery_endpoint(self):
38 return self.keycloak_url + \
39 f'/realms/{self.realm}/.well-known/openid-configuration'
40
41 def _construct_url(self, path):
42 return self.url_base + f'/{self.realm}/{path}'
43
44 def _admin_auth(self):
45 params = {
46 'grant_type': 'password',
47 'client_id': 'admin-cli',
48 'username': self.keycloak_username,
49 'password': self.keycloak_password,
50 'scope': 'openid',
51 }
52 r = requests.post(
53 self.token_endpoint,
54 data=params,
55 verify=self.ca_certs_file).json()
56
57 headers = {
58 'Authorization': ("Bearer %s" % r['access_token']),
59 'Content-Type': 'application/json'
60 }
61 self.session.headers.update(headers)
62 return r
63
64 def create_user(self, email, first_name, last_name):
65 self._admin_auth()
66 data = {
67 'username': email,
68 'email': email,
69 'firstName': first_name,
70 'lastName': last_name,
71 'enabled': True,
72 'emailVerified': True,
73 'credentials': [{
74 'value': 'secret',
75 'type': 'password',
76 }],
77 'requiredActions': []
78 }
79 return self.session.post(
80 self._construct_url('users'),
81 json=data, verify=self.ca_certs_file)
82
83 def delete_user(self, username):
84 self._admin_auth()
85 data = {
86 'id': username,
87 }
88 return self.session.delete(
89 self._construct_url('users'),
90 json=data, verify=self.ca_certs_file)