blob: 6987ada98f6a85a13a3f4ebfdbc4cec030725e50 [file] [log] [blame]
Alexander Evseev81494492017-09-14 20:42:08 +03001# -*- coding: utf-8 -*-
2'''
3Module for configuring Artifactory.
4===================================
5'''
6
7import json
8import logging
9import requests
10
11from collections import OrderedDict
12
13from lxml import etree
14from lxml import objectify
15
16
17log = logging.getLogger(__name__)
18
19
20def _api_call(endpoint, data=None, headers=None, method='GET',
21 **connection_args):
22
23 log.debug('Got connection args: {}'.format(connection_args))
24
25 # Set default values if empty
26 if 'proto' not in connection_args:
27 connection_args['proto'] = 'http'
28 if 'host' not in connection_args:
29 connection_args['host'] = 'localhost'
30 if 'port' not in connection_args:
31 connection_args['port'] = 80
32
33 base_url = connection_args.get(
34 'url',
35 '{proto}://{host}:{port}/artifactory'.format(**connection_args)
36 )
37 api_url = base_url + '/api'
38
39 username = connection_args.get('user', 'admin')
40 password = connection_args.get('password', 'password')
41 ssl_verify = connection_args.get('ssl_verify', True)
42
43 # Prepare session object
44 api_connection = requests.Session()
45 api_connection.auth = (username, password)
46 api_connection.verify = ssl_verify
47
48 # Override default method if data given
49 if(data and method == 'GET'):
50 method = 'POST'
51
52 endpoint_url = api_url + endpoint
53 log.debug('Doing {0} request to {1}'.format(method, endpoint_url))
54
55 # API call request
56 resp = api_connection.request(
57 method=method,
58 url=endpoint_url,
59 data=data,
60 headers=headers
61 )
62
63 if resp.status_code == requests.codes.ok:
64 return True, resp.text
65 else:
66 errors = json.loads(resp.text).get('errors')
67 if errors:
68 for error in errors:
69 log.error('%(status)s:%(message)s' % error)
70 else:
71 log.error('%(status)s:%(message)s' % json.loads(resp.text))
72 return False, json.loads(resp.text)
73
74
75def get_license(**kwargs):
76 endpoint = '/system/license'
77
78 return _api_call(endpoint, **kwargs)
79
80
81def add_license(license_key, **kwargs):
82 endpoint = '/system/license'
83
84 change_data = {
85 'licenseKey': license_key,
86 }
87
88 return _api_call(
89 endpoint=endpoint,
90 data=json.dumps(change_data),
91 headers={'Content-Type': 'application/json'},
92 **kwargs
93 )
94
95def get_config(**kwargs):
96 endpoint = '/system/configuration'
97
98 return _api_call(endpoint, **kwargs)
99
100
101def set_config(config_data, **kwargs):
102 endpoint = '/system/configuration'
103
104 return _api_call(
105 endpoint=endpoint,
106 data=config_data,
107 headers={'Content-Type': 'application/xml'},
108 **kwargs
109 )
110
111
112def get_ldap_config(name, **kwargs):
113
114 result, config_data = get_config(**kwargs)
115 config = objectify.fromstring(config_data.encode('ascii'))
116
117 # Find existing LDAP settings with specified key ...
118 ldap_config = None
119 for ldap_setting_iter in config.security.ldapSettings.getchildren():
120 if ldap_setting_iter.key.text == name:
121 ldap_config = ldap_setting_iter
122 break
123
124 # ... and create new one if not exists
125 if ldap_config is None:
126 ldap_config = objectify.SubElement(
127 config.security.ldapSettings, 'ldapSetting')
128 objectify.SubElement(ldap_config, 'key')._setText(name)
129
130 return result, etree.tostring(ldap_config)
131
132def set_ldap_config(name, uri, base=None, enabled=True, dn_pattern=None,
133 manager_dn=None, manager_pass=None, search_subtree=True,
134 search_filter='(&(objectClass=inetOrgPerson)(uid={0}))',
135 attr_mail='mail', create_users=True, safe_search=True,
136 **kwargs):
137
138 result, config_data = get_config(**kwargs)
139 config = objectify.fromstring(config_data.encode('ascii'))
140
141 # NOTE! Elements must ber sorted in exact order!
142 key_map = OrderedDict([
143 ('enabled', 'enabled'),
144 ('ldapUrl', 'uri'),
145 ('userDnPattern', 'dn_pattern'),
146 ('search', ''),
147 ('autoCreateUser', 'create_users'),
148 ('emailAttribute', 'attr_mail'),
149 ('ldapPoisoningProtection', 'safe_search'),
150 ])
151
152 key_map_search = OrderedDict([
153 ('searchFilter', 'search_filter'),
154 ('searchBase', 'base'),
155 ('searchSubTree', 'search_subtree'),
156 ('managerDn', 'manager_dn'),
157 ('managerPassword', 'manager_pass'),
158 ])
159
160 # Find existing LDAP settings with specified key ...
161 ldap_config = None
162 for ldap_setting_iter in config.security.ldapSettings.getchildren():
163 if ldap_setting_iter.key.text == name:
164 ldap_config = ldap_setting_iter
165 search_config = ldap_config.search
166 break
167
168 # ... and create new one if not exists
169 if ldap_config is None:
170 ldap_config = objectify.SubElement(
171 config.security.ldapSettings, 'ldapSetting')
172 objectify.SubElement(ldap_config, 'key')._setText(name)
173
174 # LDAP options
175 for xml_key, var_name in key_map.iteritems():
176
177 # Search subtree must follow element order
178 if xml_key == 'search' and not hasattr(ldap_config, 'search'):
179 search_config = objectify.SubElement(ldap_config, 'search')
180 break
181
182 if var_name in locals():
183 # Replace None with empty strings
184 var_value = locals()[var_name] or ''
185 if isinstance(var_value, bool):
186 # Boolean values should be lowercased
187 xml_text = str(var_value).lower()
188 else:
189 xml_text = str(var_value)
190
191 if hasattr(ldap_config, xml_key):
192 ldap_config[xml_key]._setText(xml_text)
193 else:
194 objectify.SubElement(ldap_config, xml_key)._setText(
195 xml_text)
196
197 # Search options (same code as above but using search_config)
198 for xml_key, var_name in key_map_search.iteritems():
199 if var_name in locals():
200 # Replace None with empty strings
201 var_value = locals()[var_name] or ''
202 if isinstance(var_value, bool):
203 # Boolean values should be lowercased
204 xml_text = str(var_value).lower()
205 else:
206 xml_text = str(var_value)
207
208 if hasattr(search_config, xml_key):
209 search_config[xml_key]._setText(xml_text)
210 else:
211 objectify.SubElement(search_config, xml_key)._setText(
212 xml_text)
213
214 change_data = etree.tostring(config)
215
216 return set_config(change_data, **kwargs)
217
218
219def list_repos(**kwargs):
220 endpoint = '/repositories'
221
222 return _api_call(endpoint, **kwargs)
223
224
225def get_repo(name, **kwargs):
226 result, repo_list = list_repos(**kwargs)
227 if name in [r['key'] for r in json.loads(repo_list)]:
228 endpoint = '/repositories/' + name
229 return _api_call(endpoint, **kwargs)
230 else:
231 return True, {}
232
233
234def set_repo(name, repo_config, **kwargs):
235 log.debug('Got repo parameters: {}'.format(repo_config))
236
237 result, repo_list = list_repos(**kwargs)
238 if name in [r['key'] for r in json.loads(repo_list)]:
239 method = 'POST'
240 else:
241 method = 'PUT'
242
243 endpoint = '/repositories/' + name
244
245 return _api_call(
246 endpoint=endpoint,
247 method=method,
248 data=json.dumps(repo_config),
249 headers={'Content-Type': 'application/json'},
250 **kwargs
251 )