| #!/usr/bin/python |
| # Copyright 2015 Mirantis, Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import datetime |
| import dateutil.parser |
| import dateutil.tz |
| import requests |
| import simplejson as json |
| |
| import collectd_base as base |
| |
| from collections import defaultdict |
| |
| # By default, query OpenStack API endpoints every 50 seconds. We choose a value |
| # less than the default group by interval (which is 60 seconds) to avoid gaps |
| # in the Grafana graphs. |
| INTERVAL = 50 |
| |
| |
| class KeystoneException(Exception): |
| pass |
| |
| |
| class PluginConfigurationException(Exception): |
| pass |
| |
| |
| class OSClient(object): |
| """ Base class for querying the OpenStack API endpoints. |
| |
| It uses the Keystone service catalog to discover the API endpoints. |
| """ |
| EXPIRATION_TOKEN_DELTA = datetime.timedelta(0, 30) |
| |
| def __init__(self, username, password, tenant, keystone_url, timeout, |
| logger, max_retries): |
| self.logger = logger |
| self.username = username |
| self.password = password |
| self.tenant_name = tenant |
| self.keystone_url = keystone_url |
| self.service_catalog = [] |
| self.tenant_id = None |
| self.timeout = timeout |
| self.token = None |
| self.valid_until = None |
| |
| # Note: prior to urllib3 v1.9, retries are made on failed connections |
| # but not on timeout and backoff time is not supported. |
| # (at this time we ship requests 2.2.1 and urllib3 1.6.1 or 1.7.1) |
| self.session = requests.Session() |
| self.session.mount( |
| 'http://', requests.adapters.HTTPAdapter(max_retries=max_retries)) |
| self.session.mount( |
| 'https://', requests.adapters.HTTPAdapter(max_retries=max_retries)) |
| |
| def is_valid_token(self): |
| now = datetime.datetime.now(tz=dateutil.tz.tzutc()) |
| return self.token and self.valid_until and self.valid_until > now |
| |
| def clear_token(self): |
| self.token = None |
| self.valid_until = None |
| |
| def get_token(self): |
| self.clear_token() |
| data = json.dumps({ |
| "auth": |
| { |
| 'tenantName': self.tenant_name, |
| 'passwordCredentials': |
| { |
| 'username': self.username, |
| 'password': self.password |
| } |
| } |
| }) |
| self.logger.info("Trying to get token from '%s'" % self.keystone_url) |
| r = self.make_request('post', |
| '%s/tokens' % self.keystone_url, data=data, |
| token_required=False) |
| if not r: |
| raise KeystoneException("Cannot get a valid token from %s" % |
| self.keystone_url) |
| |
| if r.status_code < 200 or r.status_code > 299: |
| raise KeystoneException("%s responded with code %d" % |
| (self.keystone_url, r.status_code)) |
| |
| data = r.json() |
| self.logger.debug("Got response from Keystone: '%s'" % data) |
| self.token = data['access']['token']['id'] |
| self.tenant_id = data['access']['token']['tenant']['id'] |
| self.valid_until = dateutil.parser.parse( |
| data['access']['token']['expires']) - self.EXPIRATION_TOKEN_DELTA |
| self.service_catalog = [] |
| for item in data['access']['serviceCatalog']: |
| endpoint = item['endpoints'][0] |
| self.service_catalog.append({ |
| 'name': item['name'], |
| 'region': endpoint['region'], |
| 'service_type': item['type'], |
| 'url': endpoint['internalURL'], |
| 'admin_url': endpoint['adminURL'], |
| }) |
| |
| self.logger.debug("Got token '%s'" % self.token) |
| return self.token |
| |
| def make_request(self, verb, url, data=None, token_required=True, |
| params=None): |
| kwargs = { |
| 'url': url, |
| 'timeout': self.timeout, |
| 'headers': {'Content-type': 'application/json'} |
| } |
| if token_required and not self.is_valid_token() and \ |
| not self.get_token(): |
| self.logger.error("Aborting request, no valid token") |
| return |
| elif token_required: |
| kwargs['headers']['X-Auth-Token'] = self.token |
| |
| if data is not None: |
| kwargs['data'] = data |
| |
| if params is not None: |
| kwargs['params'] = params |
| |
| func = getattr(self.session, verb.lower()) |
| |
| try: |
| r = func(**kwargs) |
| except Exception as e: |
| self.logger.error("Got exception for '%s': '%s'" % |
| (kwargs['url'], e)) |
| return |
| |
| self.logger.info("%s responded with status code %d" % |
| (kwargs['url'], r.status_code)) |
| if r.status_code == 401: |
| # Clear token in case it is revoked or invalid |
| self.clear_token() |
| |
| return r |
| |
| |
| class CollectdPlugin(base.Base): |
| |
| def __init__(self, *args, **kwargs): |
| super(CollectdPlugin, self).__init__(*args, **kwargs) |
| # The timeout/max_retries are defined according to the observations on |
| # 200 nodes environments with 600 VMs. See #1554502 for details. |
| self.timeout = 20 |
| self.max_retries = 2 |
| self.username = None |
| self.password = None |
| self.tenant_name = None |
| self.keystone_url = None |
| self.os_client = None |
| self.extra_config = {} |
| self._threads = {} |
| self.pagination_limit = None |
| self.polling_interval = 60 |
| self._last_run = None |
| self.changes_since = False |
| |
| def _build_url(self, service, resource): |
| s = (self.get_service(service) or {}) |
| url = s.get('url') |
| # v3 API must be used in order to obtain tenants in multi-domain envs |
| if service == 'keystone' and (resource in ['projects', |
| 'users', 'roles']): |
| url = url.replace('v2.0', 'v3') |
| |
| if url: |
| if url[-1] != '/': |
| url += '/' |
| url = "%s%s" % (url, resource) |
| else: |
| self.logger.error("Service '%s' not found in catalog" % service) |
| return url |
| |
| def raw_get(self, url, token_required=False): |
| return self.os_client.make_request('get', url, |
| token_required=token_required) |
| |
| def iter_workers(self, service): |
| """ Return the list of workers and their state |
| |
| Here is an example of returned dictionnary: |
| { |
| 'host': 'node.example.com', |
| 'service': 'nova-compute', |
| 'state': 'up' |
| } |
| |
| where 'state' can be 'up', 'down' or 'disabled' |
| """ |
| |
| if service == 'neutron': |
| endpoint = 'v2.0/agents' |
| entry = 'agents' |
| else: |
| endpoint = 'os-services' |
| entry = 'services' |
| |
| ost_services_r = self.get(service, endpoint) |
| |
| msg = "Cannot get state of {} workers".format(service) |
| if ost_services_r is None: |
| self.logger.warning(msg) |
| elif ost_services_r.status_code != 200: |
| msg = "{}: Got {} ({})".format( |
| msg, ost_services_r.status_code, ost_services_r.content) |
| self.logger.warning(msg) |
| else: |
| try: |
| r_json = ost_services_r.json() |
| except ValueError: |
| r_json = {} |
| |
| if entry not in r_json: |
| msg = "{}: couldn't find '{}' key".format(msg, entry) |
| self.logger.warning(msg) |
| else: |
| for val in r_json[entry]: |
| data = {'host': val['host'], 'service': val['binary']} |
| |
| if service == 'neutron': |
| if not val['admin_state_up']: |
| data['state'] = 'disabled' |
| else: |
| data['state'] = 'up' if val['alive'] else 'down' |
| else: |
| if val['status'] == 'disabled': |
| data['state'] = 'disabled' |
| elif val['state'] == 'up' or val['state'] == 'down': |
| data['state'] = val['state'] |
| else: |
| msg = "Unknown state for {} workers:{}".format( |
| service, val['state']) |
| self.logger.warning(msg) |
| continue |
| |
| yield data |
| |
| def get(self, service, resource, params=None): |
| url = self._build_url(service, resource) |
| if not url: |
| return |
| self.logger.info('GET({}) {}'.format(url, params)) |
| return self.os_client.make_request('get', url, params=params) |
| |
| @property |
| def service_catalog(self): |
| if not self.os_client.service_catalog: |
| # In case the service catalog is empty (eg Keystone was down when |
| # collectd started), we should try to get a new token |
| self.os_client.get_token() |
| return self.os_client.service_catalog |
| |
| def get_service(self, service_name): |
| return next((x for x in self.service_catalog |
| if x['name'] == service_name), None) |
| |
| def config_callback(self, config): |
| super(CollectdPlugin, self).config_callback(config) |
| for node in config.children: |
| if node.key == 'Username': |
| self.username = node.values[0] |
| elif node.key == 'Password': |
| self.password = node.values[0] |
| elif node.key == 'Tenant': |
| self.tenant_name = node.values[0] |
| elif node.key == 'KeystoneUrl': |
| self.keystone_url = node.values[0] |
| elif node.key == 'PaginationLimit': |
| self.pagination_limit = int(node.values[0]) |
| elif node.key == 'PollingInterval': |
| self.polling_interval = int(node.values[0]) |
| |
| if self.username is None: |
| raise PluginConfigurationException('Username parameter is missing') |
| if self.password is None: |
| raise PluginConfigurationException('Password parameter is missing') |
| if self.tenant_name is None: |
| raise PluginConfigurationException('Tenant parameter is missing') |
| if self.keystone_url is None: |
| raise PluginConfigurationException('KeystoneUrl parameter is missing') |
| |
| self.os_client = OSClient(self.username, self.password, |
| self.tenant_name, self.keystone_url, |
| self.timeout, self.logger, self.max_retries) |
| |
| def get_objects(self, project, object_name, api_version='', |
| params=None, detail=False, since=False): |
| """ Return a list of OpenStack objects |
| |
| The API version is not always included in the URL endpoint |
| registered in Keystone (eg Glance). In this case, use the |
| api_version parameter to specify which version should be used. |
| |
| """ |
| self.changes_since = since |
| if params is None: |
| params = {} |
| |
| if api_version: |
| resource = '%s/%s' % (api_version, object_name) |
| else: |
| resource = '%s' % (object_name) |
| |
| if detail: |
| resource = '{}/detail'.format(resource) |
| |
| opts = {} |
| if self.pagination_limit: |
| opts['limit'] = self.pagination_limit |
| |
| opts.update(params) |
| |
| def openstack_api_poller(): |
| _objects = [] |
| _opts = {} |
| _opts.update(opts) |
| |
| if self.changes_since and self._last_run: |
| _opts['changes-since'] = self._last_run.isoformat() |
| |
| # Keep track of the initial request time |
| last_run = datetime.datetime.now(tz=dateutil.tz.tzutc()) |
| has_failure = False |
| |
| while True: |
| r = self.get(project, resource, params=_opts) |
| if not r or object_name not in r.json(): |
| has_failure = True |
| if r is None: |
| err = '' |
| else: |
| err = r.text |
| self.collectd.warning('Could not find {}: {} {}'.format( |
| project, object_name, err |
| )) |
| # Avoid to provide incomplete data by reseting current |
| # set. |
| _objects = [] |
| break |
| |
| resp = r.json() |
| bulk_objs = resp.get(object_name) |
| if not bulk_objs: |
| # emtpy list |
| break |
| |
| _objects.extend(bulk_objs) |
| |
| links = resp.get('{}_links'.format(object_name)) |
| if links is None or self.pagination_limit is None: |
| # Either the pagination is not supported or there is |
| # no more data |
| # In both cases, we got at this stage all the data we |
| # can have. |
| break |
| |
| # if there is no 'next' link in the response, all data has |
| # been read. |
| if len([i for i in links if i.get('rel') == 'next']) == 0: |
| break |
| |
| _opts['marker'] = bulk_objs[-1]['id'] |
| |
| if not has_failure: |
| self._last_run = last_run |
| |
| return _objects |
| |
| poller_id = '{}:{}'.format(project, resource) |
| if poller_id not in self._threads: |
| t = base.AsyncPoller(self.collectd, |
| openstack_api_poller, |
| self.polling_interval, |
| poller_id, self.changes_since) |
| t.start() |
| self._threads[poller_id] = t |
| |
| t = self._threads[poller_id] |
| if not t.is_alive(): |
| self.logger.warning("Unexpected end of the thread {}".format( |
| t.name)) |
| del self._threads[poller_id] |
| return [] |
| |
| return t.results |
| |
| def count_objects_group_by(self, |
| list_object, |
| group_by_func, |
| count_func=None): |
| |
| """ Count the number of items grouped by arbitrary criteria.""" |
| |
| counts = defaultdict(int) |
| for obj in list_object: |
| s = group_by_func(obj) |
| try: |
| counts[s] += count_func(obj) if count_func else 1 |
| except TypeError: |
| # Ignore when count_func() doesn't return a number |
| pass |
| return counts |
| |
| def shutdown_callback(self): |
| for tid, t in self._threads.items(): |
| if t.is_alive(): |
| self.logger.info('Waiting for {} thread to finish'.format(tid)) |
| t.stop() |
| t.join() |