Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | # Copyright 2015 Mirantis, Inc. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | import datetime |
| 17 | import dateutil.parser |
| 18 | import dateutil.tz |
| 19 | import requests |
| 20 | import simplejson as json |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 21 | |
| 22 | import collectd_base as base |
| 23 | |
| 24 | from collections import defaultdict |
| 25 | |
| 26 | # By default, query OpenStack API endpoints every 50 seconds. We choose a value |
| 27 | # less than the default group by interval (which is 60 seconds) to avoid gaps |
| 28 | # in the Grafana graphs. |
| 29 | INTERVAL = 50 |
| 30 | |
| 31 | |
| 32 | class KeystoneException(Exception): |
| 33 | pass |
| 34 | |
| 35 | |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 36 | class PluginConfigurationException(Exception): |
| 37 | pass |
| 38 | |
| 39 | |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 40 | class OSClient(object): |
| 41 | """ Base class for querying the OpenStack API endpoints. |
| 42 | |
| 43 | It uses the Keystone service catalog to discover the API endpoints. |
| 44 | """ |
| 45 | EXPIRATION_TOKEN_DELTA = datetime.timedelta(0, 30) |
| 46 | |
| 47 | def __init__(self, username, password, tenant, keystone_url, timeout, |
| 48 | logger, max_retries): |
| 49 | self.logger = logger |
| 50 | self.username = username |
| 51 | self.password = password |
| 52 | self.tenant_name = tenant |
| 53 | self.keystone_url = keystone_url |
| 54 | self.service_catalog = [] |
| 55 | self.tenant_id = None |
| 56 | self.timeout = timeout |
| 57 | self.token = None |
| 58 | self.valid_until = None |
| 59 | |
| 60 | # Note: prior to urllib3 v1.9, retries are made on failed connections |
| 61 | # but not on timeout and backoff time is not supported. |
| 62 | # (at this time we ship requests 2.2.1 and urllib3 1.6.1 or 1.7.1) |
| 63 | self.session = requests.Session() |
| 64 | self.session.mount( |
| 65 | 'http://', requests.adapters.HTTPAdapter(max_retries=max_retries)) |
| 66 | self.session.mount( |
| 67 | 'https://', requests.adapters.HTTPAdapter(max_retries=max_retries)) |
| 68 | |
| 69 | def is_valid_token(self): |
| 70 | now = datetime.datetime.now(tz=dateutil.tz.tzutc()) |
| 71 | return self.token and self.valid_until and self.valid_until > now |
| 72 | |
| 73 | def clear_token(self): |
| 74 | self.token = None |
| 75 | self.valid_until = None |
| 76 | |
| 77 | def get_token(self): |
| 78 | self.clear_token() |
| 79 | data = json.dumps({ |
| 80 | "auth": |
| 81 | { |
| 82 | 'tenantName': self.tenant_name, |
| 83 | 'passwordCredentials': |
| 84 | { |
| 85 | 'username': self.username, |
| 86 | 'password': self.password |
| 87 | } |
| 88 | } |
| 89 | }) |
| 90 | self.logger.info("Trying to get token from '%s'" % self.keystone_url) |
| 91 | r = self.make_request('post', |
| 92 | '%s/tokens' % self.keystone_url, data=data, |
| 93 | token_required=False) |
| 94 | if not r: |
| 95 | raise KeystoneException("Cannot get a valid token from %s" % |
| 96 | self.keystone_url) |
| 97 | |
| 98 | if r.status_code < 200 or r.status_code > 299: |
| 99 | raise KeystoneException("%s responded with code %d" % |
| 100 | (self.keystone_url, r.status_code)) |
| 101 | |
| 102 | data = r.json() |
| 103 | self.logger.debug("Got response from Keystone: '%s'" % data) |
| 104 | self.token = data['access']['token']['id'] |
| 105 | self.tenant_id = data['access']['token']['tenant']['id'] |
| 106 | self.valid_until = dateutil.parser.parse( |
| 107 | data['access']['token']['expires']) - self.EXPIRATION_TOKEN_DELTA |
| 108 | self.service_catalog = [] |
| 109 | for item in data['access']['serviceCatalog']: |
| 110 | endpoint = item['endpoints'][0] |
| 111 | self.service_catalog.append({ |
| 112 | 'name': item['name'], |
| 113 | 'region': endpoint['region'], |
| 114 | 'service_type': item['type'], |
| 115 | 'url': endpoint['internalURL'], |
| 116 | 'admin_url': endpoint['adminURL'], |
| 117 | }) |
| 118 | |
| 119 | self.logger.debug("Got token '%s'" % self.token) |
| 120 | return self.token |
| 121 | |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 122 | def make_request(self, verb, url, data=None, token_required=True, |
| 123 | params=None): |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 124 | kwargs = { |
| 125 | 'url': url, |
| 126 | 'timeout': self.timeout, |
| 127 | 'headers': {'Content-type': 'application/json'} |
| 128 | } |
| 129 | if token_required and not self.is_valid_token() and \ |
| 130 | not self.get_token(): |
| 131 | self.logger.error("Aborting request, no valid token") |
| 132 | return |
| 133 | elif token_required: |
| 134 | kwargs['headers']['X-Auth-Token'] = self.token |
| 135 | |
| 136 | if data is not None: |
| 137 | kwargs['data'] = data |
| 138 | |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 139 | if params is not None: |
| 140 | kwargs['params'] = params |
| 141 | |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 142 | func = getattr(self.session, verb.lower()) |
| 143 | |
| 144 | try: |
| 145 | r = func(**kwargs) |
| 146 | except Exception as e: |
| 147 | self.logger.error("Got exception for '%s': '%s'" % |
| 148 | (kwargs['url'], e)) |
| 149 | return |
| 150 | |
| 151 | self.logger.info("%s responded with status code %d" % |
| 152 | (kwargs['url'], r.status_code)) |
| 153 | if r.status_code == 401: |
| 154 | # Clear token in case it is revoked or invalid |
| 155 | self.clear_token() |
| 156 | |
| 157 | return r |
| 158 | |
| 159 | |
| 160 | class CollectdPlugin(base.Base): |
| 161 | |
| 162 | def __init__(self, *args, **kwargs): |
| 163 | super(CollectdPlugin, self).__init__(*args, **kwargs) |
| 164 | # The timeout/max_retries are defined according to the observations on |
| 165 | # 200 nodes environments with 600 VMs. See #1554502 for details. |
| 166 | self.timeout = 20 |
| 167 | self.max_retries = 2 |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 168 | self.username = None |
| 169 | self.password = None |
| 170 | self.tenant_name = None |
| 171 | self.keystone_url = None |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 172 | self.os_client = None |
| 173 | self.extra_config = {} |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 174 | self._threads = {} |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 175 | self.pagination_limit = None |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 176 | self.polling_interval = 60 |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 177 | self._last_run = None |
| 178 | self.changes_since = False |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 179 | |
| 180 | def _build_url(self, service, resource): |
| 181 | s = (self.get_service(service) or {}) |
Simon Pasquier | 69029ca | 2017-02-21 11:07:36 +0100 | [diff] [blame] | 182 | url = s.get('url') |
| 183 | # v3 API must be used in order to obtain tenants in multi-domain envs |
| 184 | if service == 'keystone' and (resource in ['projects', |
| 185 | 'users', 'roles']): |
| 186 | url = url.replace('v2.0', 'v3') |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 187 | |
| 188 | if url: |
| 189 | if url[-1] != '/': |
| 190 | url += '/' |
| 191 | url = "%s%s" % (url, resource) |
| 192 | else: |
| 193 | self.logger.error("Service '%s' not found in catalog" % service) |
| 194 | return url |
| 195 | |
| 196 | def raw_get(self, url, token_required=False): |
| 197 | return self.os_client.make_request('get', url, |
| 198 | token_required=token_required) |
| 199 | |
| 200 | def iter_workers(self, service): |
| 201 | """ Return the list of workers and their state |
| 202 | |
| 203 | Here is an example of returned dictionnary: |
| 204 | { |
| 205 | 'host': 'node.example.com', |
| 206 | 'service': 'nova-compute', |
| 207 | 'state': 'up' |
| 208 | } |
| 209 | |
| 210 | where 'state' can be 'up', 'down' or 'disabled' |
| 211 | """ |
| 212 | |
| 213 | if service == 'neutron': |
| 214 | endpoint = 'v2.0/agents' |
| 215 | entry = 'agents' |
| 216 | else: |
| 217 | endpoint = 'os-services' |
| 218 | entry = 'services' |
| 219 | |
| 220 | ost_services_r = self.get(service, endpoint) |
| 221 | |
| 222 | msg = "Cannot get state of {} workers".format(service) |
| 223 | if ost_services_r is None: |
| 224 | self.logger.warning(msg) |
| 225 | elif ost_services_r.status_code != 200: |
| 226 | msg = "{}: Got {} ({})".format( |
| 227 | msg, ost_services_r.status_code, ost_services_r.content) |
| 228 | self.logger.warning(msg) |
| 229 | else: |
| 230 | try: |
| 231 | r_json = ost_services_r.json() |
| 232 | except ValueError: |
| 233 | r_json = {} |
| 234 | |
| 235 | if entry not in r_json: |
| 236 | msg = "{}: couldn't find '{}' key".format(msg, entry) |
| 237 | self.logger.warning(msg) |
| 238 | else: |
| 239 | for val in r_json[entry]: |
| 240 | data = {'host': val['host'], 'service': val['binary']} |
| 241 | |
| 242 | if service == 'neutron': |
| 243 | if not val['admin_state_up']: |
| 244 | data['state'] = 'disabled' |
| 245 | else: |
| 246 | data['state'] = 'up' if val['alive'] else 'down' |
| 247 | else: |
| 248 | if val['status'] == 'disabled': |
| 249 | data['state'] = 'disabled' |
| 250 | elif val['state'] == 'up' or val['state'] == 'down': |
| 251 | data['state'] = val['state'] |
| 252 | else: |
| 253 | msg = "Unknown state for {} workers:{}".format( |
| 254 | service, val['state']) |
| 255 | self.logger.warning(msg) |
| 256 | continue |
| 257 | |
| 258 | yield data |
| 259 | |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 260 | def get(self, service, resource, params=None): |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 261 | url = self._build_url(service, resource) |
| 262 | if not url: |
| 263 | return |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 264 | self.logger.info('GET({}) {}'.format(url, params)) |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 265 | return self.os_client.make_request('get', url, params=params) |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 266 | |
| 267 | @property |
| 268 | def service_catalog(self): |
| 269 | if not self.os_client.service_catalog: |
| 270 | # In case the service catalog is empty (eg Keystone was down when |
| 271 | # collectd started), we should try to get a new token |
| 272 | self.os_client.get_token() |
| 273 | return self.os_client.service_catalog |
| 274 | |
| 275 | def get_service(self, service_name): |
| 276 | return next((x for x in self.service_catalog |
| 277 | if x['name'] == service_name), None) |
| 278 | |
| 279 | def config_callback(self, config): |
| 280 | super(CollectdPlugin, self).config_callback(config) |
| 281 | for node in config.children: |
| 282 | if node.key == 'Username': |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 283 | self.username = node.values[0] |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 284 | elif node.key == 'Password': |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 285 | self.password = node.values[0] |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 286 | elif node.key == 'Tenant': |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 287 | self.tenant_name = node.values[0] |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 288 | elif node.key == 'KeystoneUrl': |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 289 | self.keystone_url = node.values[0] |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 290 | elif node.key == 'PaginationLimit': |
| 291 | self.pagination_limit = int(node.values[0]) |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 292 | elif node.key == 'PollingInterval': |
| 293 | self.polling_interval = int(node.values[0]) |
| 294 | |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 295 | if self.username is None: |
| 296 | raise PluginConfigurationException('Username parameter is missing') |
| 297 | if self.password is None: |
| 298 | raise PluginConfigurationException('Password parameter is missing') |
| 299 | if self.tenant_name is None: |
| 300 | raise PluginConfigurationException('Tenant parameter is missing') |
| 301 | if self.keystone_url is None: |
| 302 | raise PluginConfigurationException('KeystoneUrl parameter is missing') |
| 303 | |
| 304 | self.os_client = OSClient(self.username, self.password, |
| 305 | self.tenant_name, self.keystone_url, |
| 306 | self.timeout, self.logger, self.max_retries) |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 307 | |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 308 | def get_objects(self, project, object_name, api_version='', |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 309 | params=None, detail=False, since=False): |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 310 | """ Return a list of OpenStack objects |
| 311 | |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 312 | The API version is not always included in the URL endpoint |
| 313 | registered in Keystone (eg Glance). In this case, use the |
| 314 | api_version parameter to specify which version should be used. |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 315 | |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 316 | """ |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 317 | self.changes_since = since |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 318 | if params is None: |
| 319 | params = {} |
| 320 | |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 321 | if api_version: |
| 322 | resource = '%s/%s' % (api_version, object_name) |
| 323 | else: |
| 324 | resource = '%s' % (object_name) |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 325 | |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 326 | if detail: |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 327 | resource = '{}/detail'.format(resource) |
| 328 | |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 329 | opts = {} |
| 330 | if self.pagination_limit: |
| 331 | opts['limit'] = self.pagination_limit |
| 332 | |
| 333 | opts.update(params) |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 334 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 335 | def openstack_api_poller(): |
| 336 | _objects = [] |
| 337 | _opts = {} |
| 338 | _opts.update(opts) |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 339 | |
| 340 | if self.changes_since and self._last_run: |
| 341 | _opts['changes-since'] = self._last_run.isoformat() |
| 342 | |
| 343 | # Keep track of the initial request time |
| 344 | last_run = datetime.datetime.now(tz=dateutil.tz.tzutc()) |
| 345 | has_failure = False |
| 346 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 347 | while True: |
| 348 | r = self.get(project, resource, params=_opts) |
| 349 | if not r or object_name not in r.json(): |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 350 | has_failure = True |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 351 | if r is None: |
| 352 | err = '' |
| 353 | else: |
| 354 | err = r.text |
| 355 | self.collectd.warning('Could not find {}: {} {}'.format( |
| 356 | project, object_name, err |
| 357 | )) |
| 358 | # Avoid to provide incomplete data by reseting current |
| 359 | # set. |
| 360 | _objects = [] |
| 361 | break |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 362 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 363 | resp = r.json() |
| 364 | bulk_objs = resp.get(object_name) |
| 365 | if not bulk_objs: |
| 366 | # emtpy list |
| 367 | break |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 368 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 369 | _objects.extend(bulk_objs) |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 370 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 371 | links = resp.get('{}_links'.format(object_name)) |
| 372 | if links is None or self.pagination_limit is None: |
| 373 | # Either the pagination is not supported or there is |
| 374 | # no more data |
| 375 | # In both cases, we got at this stage all the data we |
| 376 | # can have. |
| 377 | break |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 378 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 379 | # if there is no 'next' link in the response, all data has |
| 380 | # been read. |
| 381 | if len([i for i in links if i.get('rel') == 'next']) == 0: |
| 382 | break |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 383 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 384 | _opts['marker'] = bulk_objs[-1]['id'] |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 385 | |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 386 | if not has_failure: |
| 387 | self._last_run = last_run |
| 388 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 389 | return _objects |
Olivier Bourdon | 04b8eaa | 2017-02-09 09:34:22 +0100 | [diff] [blame] | 390 | |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 391 | poller_id = '{}:{}'.format(project, resource) |
| 392 | if poller_id not in self._threads: |
| 393 | t = base.AsyncPoller(self.collectd, |
| 394 | openstack_api_poller, |
| 395 | self.polling_interval, |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 396 | poller_id, self.changes_since) |
Olivier Bourdon | b9227b3 | 2017-02-09 09:46:26 +0100 | [diff] [blame] | 397 | t.start() |
| 398 | self._threads[poller_id] = t |
| 399 | |
| 400 | t = self._threads[poller_id] |
| 401 | if not t.is_alive(): |
| 402 | self.logger.warning("Unexpected end of the thread {}".format( |
| 403 | t.name)) |
| 404 | del self._threads[poller_id] |
| 405 | return [] |
| 406 | |
Swann Croiset | c2627e6 | 2017-02-23 15:52:34 +0100 | [diff] [blame] | 407 | return t.results |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 408 | |
| 409 | def count_objects_group_by(self, |
| 410 | list_object, |
| 411 | group_by_func, |
| 412 | count_func=None): |
| 413 | |
| 414 | """ Count the number of items grouped by arbitrary criteria.""" |
| 415 | |
| 416 | counts = defaultdict(int) |
| 417 | for obj in list_object: |
| 418 | s = group_by_func(obj) |
Simon Pasquier | 7027ff5 | 2017-01-05 17:13:55 +0100 | [diff] [blame] | 419 | try: |
| 420 | counts[s] += count_func(obj) if count_func else 1 |
| 421 | except TypeError: |
| 422 | # Ignore when count_func() doesn't return a number |
| 423 | pass |
Ales Komarek | 2675e84 | 2016-10-05 00:10:44 +0200 | [diff] [blame] | 424 | return counts |
Simon Pasquier | 8855d6c | 2017-02-23 12:38:58 +0100 | [diff] [blame] | 425 | |
| 426 | def shutdown_callback(self): |
| 427 | for tid, t in self._threads.items(): |
| 428 | if t.is_alive(): |
| 429 | self.logger.info('Waiting for {} thread to finish'.format(tid)) |
| 430 | t.stop() |
| 431 | t.join() |