blob: 50c3495511c4215548c2144f17c657f9216518dc [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
class KeycloakClient(object):
def __init__(self, keycloak_url, keycloak_username, keycloak_password,
realm='master', ca_certs_file=False):
self.keycloak_url = keycloak_url
self.keycloak_username = keycloak_username
self.keycloak_password = keycloak_password
self.session = requests.session()
self.realm = realm
self.ca_certs_file = ca_certs_file
self._admin_auth()
@property
def url_base(self):
return self.keycloak_url + f'/admin/realms'
@property
def token_endpoint(self):
return self.keycloak_url + \
f'/realms/{self.realm}/protocol/openid-connect/token'
@property
def discovery_endpoint(self):
return self.keycloak_url + \
f'/realms/{self.realm}/.well-known/openid-configuration'
def _construct_url(self, path):
return self.url_base + f'/{self.realm}/{path}'
def _admin_auth(self):
params = {
'grant_type': 'password',
'client_id': 'admin-cli',
'username': self.keycloak_username,
'password': self.keycloak_password,
'scope': 'openid',
}
r = requests.post(
self.token_endpoint,
data=params,
verify=self.ca_certs_file).json()
headers = {
'Authorization': ("Bearer %s" % r['access_token']),
'Content-Type': 'application/json'
}
self.session.headers.update(headers)
return r
def create_user(self, email, first_name, last_name):
self._admin_auth()
data = {
'username': email,
'email': email,
'firstName': first_name,
'lastName': last_name,
'enabled': True,
'emailVerified': True,
'credentials': [{
'value': 'secret',
'type': 'password',
}],
'requiredActions': []
}
return self.session.post(
self._construct_url('users'),
json=data, verify=self.ca_certs_file)
def delete_user(self, username):
self._admin_auth()
data = {
'id': username,
}
return self.session.delete(
self._construct_url('users'),
json=data, verify=self.ca_certs_file)