blob: f0ecbfcc6b74c4e119ee0253d7fb68cf17307cf6 [file] [log] [blame]
Ales Komarek2675e842016-10-05 00:10:44 +02001#!/usr/bin/python
2# Copyright 2015 Mirantis, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import datetime
17import dateutil.parser
18import dateutil.tz
19import requests
20import simplejson as json
Ales Komarek2675e842016-10-05 00:10:44 +020021
22import collectd_base as base
23
24from collections import defaultdict
25
26# By default, query OpenStack API endpoints every 50 seconds. We choose a value
27# less than the default group by interval (which is 60 seconds) to avoid gaps
28# in the Grafana graphs.
29INTERVAL = 50
30
31
32class KeystoneException(Exception):
33 pass
34
35
Simon Pasquier8855d6c2017-02-23 12:38:58 +010036class PluginConfigurationException(Exception):
37 pass
38
39
Ales Komarek2675e842016-10-05 00:10:44 +020040class OSClient(object):
41 """ Base class for querying the OpenStack API endpoints.
42
43 It uses the Keystone service catalog to discover the API endpoints.
44 """
45 EXPIRATION_TOKEN_DELTA = datetime.timedelta(0, 30)
46
47 def __init__(self, username, password, tenant, keystone_url, timeout,
48 logger, max_retries):
49 self.logger = logger
50 self.username = username
51 self.password = password
52 self.tenant_name = tenant
53 self.keystone_url = keystone_url
54 self.service_catalog = []
55 self.tenant_id = None
56 self.timeout = timeout
57 self.token = None
58 self.valid_until = None
59
60 # Note: prior to urllib3 v1.9, retries are made on failed connections
61 # but not on timeout and backoff time is not supported.
62 # (at this time we ship requests 2.2.1 and urllib3 1.6.1 or 1.7.1)
63 self.session = requests.Session()
64 self.session.mount(
65 'http://', requests.adapters.HTTPAdapter(max_retries=max_retries))
66 self.session.mount(
67 'https://', requests.adapters.HTTPAdapter(max_retries=max_retries))
68
69 def is_valid_token(self):
70 now = datetime.datetime.now(tz=dateutil.tz.tzutc())
71 return self.token and self.valid_until and self.valid_until > now
72
73 def clear_token(self):
74 self.token = None
75 self.valid_until = None
76
77 def get_token(self):
78 self.clear_token()
79 data = json.dumps({
80 "auth":
81 {
82 'tenantName': self.tenant_name,
83 'passwordCredentials':
84 {
85 'username': self.username,
86 'password': self.password
87 }
88 }
89 })
90 self.logger.info("Trying to get token from '%s'" % self.keystone_url)
91 r = self.make_request('post',
92 '%s/tokens' % self.keystone_url, data=data,
93 token_required=False)
94 if not r:
95 raise KeystoneException("Cannot get a valid token from %s" %
96 self.keystone_url)
97
98 if r.status_code < 200 or r.status_code > 299:
99 raise KeystoneException("%s responded with code %d" %
100 (self.keystone_url, r.status_code))
101
102 data = r.json()
103 self.logger.debug("Got response from Keystone: '%s'" % data)
104 self.token = data['access']['token']['id']
105 self.tenant_id = data['access']['token']['tenant']['id']
106 self.valid_until = dateutil.parser.parse(
107 data['access']['token']['expires']) - self.EXPIRATION_TOKEN_DELTA
108 self.service_catalog = []
109 for item in data['access']['serviceCatalog']:
110 endpoint = item['endpoints'][0]
111 self.service_catalog.append({
112 'name': item['name'],
113 'region': endpoint['region'],
114 'service_type': item['type'],
115 'url': endpoint['internalURL'],
116 'admin_url': endpoint['adminURL'],
117 })
118
119 self.logger.debug("Got token '%s'" % self.token)
120 return self.token
121
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100122 def make_request(self, verb, url, data=None, token_required=True,
123 params=None):
Ales Komarek2675e842016-10-05 00:10:44 +0200124 kwargs = {
125 'url': url,
126 'timeout': self.timeout,
127 'headers': {'Content-type': 'application/json'}
128 }
129 if token_required and not self.is_valid_token() and \
130 not self.get_token():
131 self.logger.error("Aborting request, no valid token")
132 return
133 elif token_required:
134 kwargs['headers']['X-Auth-Token'] = self.token
135
136 if data is not None:
137 kwargs['data'] = data
138
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100139 if params is not None:
140 kwargs['params'] = params
141
Ales Komarek2675e842016-10-05 00:10:44 +0200142 func = getattr(self.session, verb.lower())
143
144 try:
145 r = func(**kwargs)
146 except Exception as e:
147 self.logger.error("Got exception for '%s': '%s'" %
148 (kwargs['url'], e))
149 return
150
151 self.logger.info("%s responded with status code %d" %
152 (kwargs['url'], r.status_code))
153 if r.status_code == 401:
154 # Clear token in case it is revoked or invalid
155 self.clear_token()
156
157 return r
158
159
160class CollectdPlugin(base.Base):
161
162 def __init__(self, *args, **kwargs):
163 super(CollectdPlugin, self).__init__(*args, **kwargs)
164 # The timeout/max_retries are defined according to the observations on
165 # 200 nodes environments with 600 VMs. See #1554502 for details.
166 self.timeout = 20
167 self.max_retries = 2
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100168 self.username = None
169 self.password = None
170 self.tenant_name = None
171 self.keystone_url = None
Ales Komarek2675e842016-10-05 00:10:44 +0200172 self.os_client = None
173 self.extra_config = {}
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100174 self._threads = {}
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100175 self.pagination_limit = None
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100176 self.polling_interval = 60
Swann Croisetc2627e62017-02-23 15:52:34 +0100177 self._last_run = None
178 self.changes_since = False
Ales Komarek2675e842016-10-05 00:10:44 +0200179
180 def _build_url(self, service, resource):
181 s = (self.get_service(service) or {})
Simon Pasquier69029ca2017-02-21 11:07:36 +0100182 url = s.get('url')
183 # v3 API must be used in order to obtain tenants in multi-domain envs
184 if service == 'keystone' and (resource in ['projects',
185 'users', 'roles']):
186 url = url.replace('v2.0', 'v3')
Ales Komarek2675e842016-10-05 00:10:44 +0200187
188 if url:
189 if url[-1] != '/':
190 url += '/'
191 url = "%s%s" % (url, resource)
192 else:
193 self.logger.error("Service '%s' not found in catalog" % service)
194 return url
195
196 def raw_get(self, url, token_required=False):
197 return self.os_client.make_request('get', url,
198 token_required=token_required)
199
200 def iter_workers(self, service):
201 """ Return the list of workers and their state
202
203 Here is an example of returned dictionnary:
204 {
205 'host': 'node.example.com',
206 'service': 'nova-compute',
207 'state': 'up'
208 }
209
210 where 'state' can be 'up', 'down' or 'disabled'
211 """
212
213 if service == 'neutron':
214 endpoint = 'v2.0/agents'
215 entry = 'agents'
216 else:
217 endpoint = 'os-services'
218 entry = 'services'
219
220 ost_services_r = self.get(service, endpoint)
221
222 msg = "Cannot get state of {} workers".format(service)
223 if ost_services_r is None:
224 self.logger.warning(msg)
225 elif ost_services_r.status_code != 200:
226 msg = "{}: Got {} ({})".format(
227 msg, ost_services_r.status_code, ost_services_r.content)
228 self.logger.warning(msg)
229 else:
230 try:
231 r_json = ost_services_r.json()
232 except ValueError:
233 r_json = {}
234
235 if entry not in r_json:
236 msg = "{}: couldn't find '{}' key".format(msg, entry)
237 self.logger.warning(msg)
238 else:
239 for val in r_json[entry]:
240 data = {'host': val['host'], 'service': val['binary']}
241
242 if service == 'neutron':
243 if not val['admin_state_up']:
244 data['state'] = 'disabled'
245 else:
246 data['state'] = 'up' if val['alive'] else 'down'
247 else:
248 if val['status'] == 'disabled':
249 data['state'] = 'disabled'
250 elif val['state'] == 'up' or val['state'] == 'down':
251 data['state'] = val['state']
252 else:
253 msg = "Unknown state for {} workers:{}".format(
254 service, val['state'])
255 self.logger.warning(msg)
256 continue
257
258 yield data
259
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100260 def get(self, service, resource, params=None):
Ales Komarek2675e842016-10-05 00:10:44 +0200261 url = self._build_url(service, resource)
262 if not url:
263 return
Swann Croisetc2627e62017-02-23 15:52:34 +0100264 self.logger.info('GET({}) {}'.format(url, params))
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100265 return self.os_client.make_request('get', url, params=params)
Ales Komarek2675e842016-10-05 00:10:44 +0200266
267 @property
268 def service_catalog(self):
269 if not self.os_client.service_catalog:
270 # In case the service catalog is empty (eg Keystone was down when
271 # collectd started), we should try to get a new token
272 self.os_client.get_token()
273 return self.os_client.service_catalog
274
275 def get_service(self, service_name):
276 return next((x for x in self.service_catalog
277 if x['name'] == service_name), None)
278
279 def config_callback(self, config):
280 super(CollectdPlugin, self).config_callback(config)
281 for node in config.children:
282 if node.key == 'Username':
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100283 self.username = node.values[0]
Ales Komarek2675e842016-10-05 00:10:44 +0200284 elif node.key == 'Password':
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100285 self.password = node.values[0]
Ales Komarek2675e842016-10-05 00:10:44 +0200286 elif node.key == 'Tenant':
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100287 self.tenant_name = node.values[0]
Ales Komarek2675e842016-10-05 00:10:44 +0200288 elif node.key == 'KeystoneUrl':
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100289 self.keystone_url = node.values[0]
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100290 elif node.key == 'PaginationLimit':
291 self.pagination_limit = int(node.values[0])
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100292 elif node.key == 'PollingInterval':
293 self.polling_interval = int(node.values[0])
294
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100295 if self.username is None:
296 raise PluginConfigurationException('Username parameter is missing')
297 if self.password is None:
298 raise PluginConfigurationException('Password parameter is missing')
299 if self.tenant_name is None:
300 raise PluginConfigurationException('Tenant parameter is missing')
301 if self.keystone_url is None:
302 raise PluginConfigurationException('KeystoneUrl parameter is missing')
303
304 self.os_client = OSClient(self.username, self.password,
305 self.tenant_name, self.keystone_url,
306 self.timeout, self.logger, self.max_retries)
Ales Komarek2675e842016-10-05 00:10:44 +0200307
Ales Komarek2675e842016-10-05 00:10:44 +0200308 def get_objects(self, project, object_name, api_version='',
Swann Croisetc2627e62017-02-23 15:52:34 +0100309 params=None, detail=False, since=False):
Ales Komarek2675e842016-10-05 00:10:44 +0200310 """ Return a list of OpenStack objects
311
Ales Komarek2675e842016-10-05 00:10:44 +0200312 The API version is not always included in the URL endpoint
313 registered in Keystone (eg Glance). In this case, use the
314 api_version parameter to specify which version should be used.
Ales Komarek2675e842016-10-05 00:10:44 +0200315
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100316 """
Swann Croisetc2627e62017-02-23 15:52:34 +0100317 self.changes_since = since
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100318 if params is None:
319 params = {}
320
Ales Komarek2675e842016-10-05 00:10:44 +0200321 if api_version:
322 resource = '%s/%s' % (api_version, object_name)
323 else:
324 resource = '%s' % (object_name)
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100325
Ales Komarek2675e842016-10-05 00:10:44 +0200326 if detail:
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100327 resource = '{}/detail'.format(resource)
328
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100329 opts = {}
330 if self.pagination_limit:
331 opts['limit'] = self.pagination_limit
332
333 opts.update(params)
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100334
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100335 def openstack_api_poller():
336 _objects = []
337 _opts = {}
338 _opts.update(opts)
Swann Croisetc2627e62017-02-23 15:52:34 +0100339
340 if self.changes_since and self._last_run:
341 _opts['changes-since'] = self._last_run.isoformat()
342
343 # Keep track of the initial request time
344 last_run = datetime.datetime.now(tz=dateutil.tz.tzutc())
345 has_failure = False
346
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100347 while True:
348 r = self.get(project, resource, params=_opts)
349 if not r or object_name not in r.json():
Swann Croisetc2627e62017-02-23 15:52:34 +0100350 has_failure = True
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100351 if r is None:
352 err = ''
353 else:
354 err = r.text
355 self.collectd.warning('Could not find {}: {} {}'.format(
356 project, object_name, err
357 ))
358 # Avoid to provide incomplete data by reseting current
359 # set.
360 _objects = []
361 break
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100362
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100363 resp = r.json()
364 bulk_objs = resp.get(object_name)
365 if not bulk_objs:
366 # emtpy list
367 break
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100368
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100369 _objects.extend(bulk_objs)
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100370
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100371 links = resp.get('{}_links'.format(object_name))
372 if links is None or self.pagination_limit is None:
373 # Either the pagination is not supported or there is
374 # no more data
375 # In both cases, we got at this stage all the data we
376 # can have.
377 break
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100378
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100379 # if there is no 'next' link in the response, all data has
380 # been read.
381 if len([i for i in links if i.get('rel') == 'next']) == 0:
382 break
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100383
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100384 _opts['marker'] = bulk_objs[-1]['id']
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100385
Swann Croisetc2627e62017-02-23 15:52:34 +0100386 if not has_failure:
387 self._last_run = last_run
388
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100389 return _objects
Olivier Bourdon04b8eaa2017-02-09 09:34:22 +0100390
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100391 poller_id = '{}:{}'.format(project, resource)
392 if poller_id not in self._threads:
393 t = base.AsyncPoller(self.collectd,
394 openstack_api_poller,
395 self.polling_interval,
Swann Croisetc2627e62017-02-23 15:52:34 +0100396 poller_id, self.changes_since)
Olivier Bourdonb9227b32017-02-09 09:46:26 +0100397 t.start()
398 self._threads[poller_id] = t
399
400 t = self._threads[poller_id]
401 if not t.is_alive():
402 self.logger.warning("Unexpected end of the thread {}".format(
403 t.name))
404 del self._threads[poller_id]
405 return []
406
Swann Croisetc2627e62017-02-23 15:52:34 +0100407 return t.results
Ales Komarek2675e842016-10-05 00:10:44 +0200408
409 def count_objects_group_by(self,
410 list_object,
411 group_by_func,
412 count_func=None):
413
414 """ Count the number of items grouped by arbitrary criteria."""
415
416 counts = defaultdict(int)
417 for obj in list_object:
418 s = group_by_func(obj)
Simon Pasquier7027ff52017-01-05 17:13:55 +0100419 try:
420 counts[s] += count_func(obj) if count_func else 1
421 except TypeError:
422 # Ignore when count_func() doesn't return a number
423 pass
Ales Komarek2675e842016-10-05 00:10:44 +0200424 return counts
Simon Pasquier8855d6c2017-02-23 12:38:58 +0100425
426 def shutdown_callback(self):
427 for tid, t in self._threads.items():
428 if t.is_alive():
429 self.logger.info('Waiting for {} thread to finish'.format(tid))
430 t.stop()
431 t.join()