blob: 493aebe65c59d121a67a1b25bc4405e0a83ca019 [file] [log] [blame]
import bcrypt
import logging
import requests
from salt.exceptions import SaltInvocationError
logger = logging.getLogger(__name__)
def call_groovy_script(script, props, username=None, password=None, success_status_codes=[200]):
"""
Common method for call Jenkins groovy script API
:param script: groovy script template
:param props: groovy script properties
:param username: jenkins username (optional,
if missing creds from sall will be used)
:param password: jenkins password (optional,
if missing creds from sall will be used)
:param success_status_codes: success response status code
(optional) in some cases we want to declare error call as success
:returns: HTTP dict {status,code,msg}
"""
ret = {
"status": "FAILED",
"code": 999,
"msg": ""
}
jenkins_url, jenkins_user, jenkins_password = get_jenkins_auth()
if username:
jenkins_user = username
if password:
jenkins_password = password
if not jenkins_url:
raise SaltInvocationError('No Jenkins URL found.')
token_obj = get_api_crumb(jenkins_url, jenkins_user, jenkins_password)
req_data = {"script": render_groovy_script(script, props)}
if token_obj:
req_data[token_obj["crumbRequestField"]] = token_obj["crumb"]
logger.debug("Calling Jenkins script API with URL: %s", jenkins_url)
req = requests.post('%s/scriptText' % jenkins_url,
auth=(jenkins_user, jenkins_password),
data=req_data)
ret["code"] = req.status_code
if req.status_code in success_status_codes:
ret["status"] = "SUCCESS"
ret["msg"] = req.text
logger.debug("Jenkins script API call success: %s", ret)
else:
logger.error("Jenkins script API call failed. \
Return code %s. Text: %s", req.status_code, req.text)
return ret
def render_groovy_script(script, props):
"""
Helper method for rendering groovy script with props
:param script: groovy script template
:param props: groovy script properties
:returns: generated groovy script
"""
return script.format(**props)
def get_api_crumb(jenkins_url=None, jenkins_user=None, jenkins_password=None):
"""
Obtains Jenkins API crumb, if CSRF protection is enabled.
Jenkins params can be given by params or not, if not,
params will be get from salt.
:param jenkins_url: Jenkins URL (optional)
:param jenkins_user: Jenkins admin username (optional)
:param jenkins_password: Jenkins admin password (optional)
:returns: salt-specified state dict
"""
if not jenkins_url:
jenkins_url, jenkins_user, jenkins_password = get_jenkins_auth()
logger.debug("Obtaining Jenkins API crumb for URL: %s", jenkins_url)
tokenReq = requests.get("%s/crumbIssuer/api/json" % jenkins_url,
auth=(jenkins_user, jenkins_password) if jenkins_user else None)
if tokenReq.status_code == 200:
return tokenReq.json()
elif tokenReq.status_code in [404, 401]:
# 404 means CSRF security is disabled, so api crumb is not necessary,
# 401 means unauthorized
return None
else:
raise Exception("Cannot obtain Jenkins API crumb. Status code: %s. Text: %s" %
(tokenReq.status_code, tokenReq.text))
def get_jenkins_auth():
"""
Get jenkins params from salt
"""
jenkins_url = __salt__['config.get']('jenkins.url') or \
__salt__['config.get']('jenkins:url') or \
__salt__['pillar.get']('jenkins.url')
jenkins_user = __salt__['config.get']('jenkins.user') or \
__salt__['config.get']('jenkins:user') or \
__salt__['pillar.get']('jenkins.user')
jenkins_password = __salt__['config.get']('jenkins.password') or \
__salt__['config.get']('jenkins:password') or \
__salt__['pillar.get']('jenkins.password')
return (jenkins_url, jenkins_user, jenkins_password)
def encode_password(password):
"""
Hash plaintext password by jenkins bcrypt algorithm
:param password: plain-text password
:returns: bcrypt hashed password
"""
if isinstance(password, str):
return bcrypt.hashpw(password, bcrypt.gensalt(prefix=b"2a"))