| # Copyright 2011 Quanta Research Cambridge, Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """The entry point for the execution of a workloadTo execute a workload. |
| Users pass in a description of the workload and a nova manager object |
| to the bash_openstack function call""" |
| |
| import datetime |
| import random |
| import time |
| from urlparse import urlparse |
| |
| from config import StressConfig |
| from state import ClusterState |
| from state import FloatingIpState |
| from state import KeyPairState |
| from state import VolumeState |
| import stress.utils |
| from test_case import logging |
| |
| from tempest.common.utils.data_utils import rand_name |
| |
| # setup logging to file |
| logging.basicConfig( |
| format='%(asctime)s %(name)-20s %(levelname)-8s %(message)s', |
| datefmt='%m-%d %H:%M:%S', |
| filename="stress.debug.log", |
| filemode="w", |
| level=logging.DEBUG, |
| ) |
| |
| # define a Handler which writes INFO messages or higher to the sys.stdout |
| _console = logging.StreamHandler() |
| _console.setLevel(logging.INFO) |
| # set a format which is simpler for console use |
| _formatter = logging.Formatter('%(name)-20s: %(levelname)-8s %(message)s') |
| # tell the handler to use this format |
| _console.setFormatter(_formatter) |
| # add the handler to the root logger |
| logging.getLogger('').addHandler(_console) |
| |
| |
| def _create_cases(choice_spec): |
| """ |
| Generate a workload of tests from workload description |
| """ |
| cases = [] |
| count = 0 |
| for choice in choice_spec: |
| p = choice.probability |
| for i in range(p): |
| cases.append(choice) |
| i = i + p |
| count = count + p |
| assert(count == 100) |
| return cases |
| |
| |
| def _get_compute_nodes(keypath, user, controller): |
| """ |
| Returns a list of active compute nodes. List is generated by running |
| nova-manage on the controller. |
| """ |
| nodes = [] |
| if keypath is None or user is None: |
| return nodes |
| cmd = "nova-manage service list | grep ^nova-compute" |
| lines = stress.utils.ssh(keypath, user, controller, cmd).split('\n') |
| # For example: nova-compute xg11eth0 nova enabled :-) 2011-10-31 18:57:46 |
| # This is fragile but there is, at present, no other way to get this info. |
| for line in lines: |
| words = line.split() |
| if len(words) > 0 and words[4] == ":-)": |
| nodes.append(words[1]) |
| return nodes |
| |
| |
| def _error_in_logs(keypath, logdir, user, nodes): |
| """ |
| Detect errors in the nova log files on the controller and compute nodes. |
| """ |
| grep = 'egrep "ERROR\|TRACE" %s/*.log' % logdir |
| for node in nodes: |
| errors = stress.utils.ssh(keypath, user, node, grep, check=False) |
| if len(errors) > 0: |
| logging.error('%s: %s' % (node, errors)) |
| return True |
| return False |
| |
| |
| def create_initial_vms(manager, state, count): |
| image = manager.config.compute.image_ref |
| flavor = manager.config.compute.flavor_ref |
| servers = [] |
| logging.info('Creating %d vms' % count) |
| for _ in xrange(count): |
| name = rand_name('initial_vm-') |
| _, server = manager.servers_client.create_server(name, image, flavor) |
| servers.append(server) |
| for server in servers: |
| manager.servers_client.wait_for_server_status(server['id'], 'ACTIVE') |
| logging.info('Server Name: %s Id: %s' % (name, server['id'])) |
| state.set_instance_state(server['id'], (server, 'ACTIVE')) |
| |
| |
| def create_initial_floating_ips(manager, state, count): |
| logging.info('Creating %d floating ips' % count) |
| for _ in xrange(count): |
| _, ip = manager.floating_ips_client.create_floating_ip() |
| logging.info('Ip: %s' % ip['ip']) |
| state.add_floating_ip(FloatingIpState(ip)) |
| |
| |
| def create_initial_keypairs(manager, state, count): |
| logging.info('Creating %d keypairs' % count) |
| for _ in xrange(count): |
| name = rand_name('keypair-') |
| _, keypair = manager.keypairs_client.create_keypair(name) |
| logging.info('Keypair: %s' % name) |
| state.add_keypair(KeyPairState(keypair)) |
| |
| |
| def create_initial_volumes(manager, state, count): |
| volumes = [] |
| logging.info('Creating %d volumes' % count) |
| for _ in xrange(count): |
| name = rand_name('volume-') |
| _, volume = manager.volumes_client.create_volume(size=1, |
| display_name=name) |
| volumes.append(volume) |
| for volume in volumes: |
| manager.volumes_client.wait_for_volume_status(volume['id'], |
| 'available') |
| logging.info('Volume Name: %s Id: %s' % (name, volume['id'])) |
| state.add_volume(VolumeState(volume)) |
| |
| |
| def bash_openstack(manager, |
| choice_spec, |
| **kwargs): |
| """ |
| Workload driver. Executes a workload as specified by the `choice_spec` |
| parameter against a nova-cluster. |
| |
| `manager` : Manager object |
| `choice_spec` : list of BasherChoice actions to run on the cluster |
| `kargs` : keyword arguments to the constructor of `test_case` |
| `duration` = how long this test should last (3 sec) |
| `sleep_time` = time to sleep between actions (in msec) |
| `test_name` = human readable workload description |
| (default: unnamed test) |
| `max_vms` = maximum number of instances to launch |
| (default: 32) |
| `seed` = random seed (default: None) |
| """ |
| stress_config = StressConfig(manager.config) |
| # get keyword arguments |
| duration = kwargs.get('duration', datetime.timedelta(seconds=10)) |
| seed = kwargs.get('seed', None) |
| sleep_time = float(kwargs.get('sleep_time', 3000)) / 1000 |
| max_vms = int(kwargs.get('max_vms', stress_config.max_instances)) |
| test_name = kwargs.get('test_name', 'unamed test') |
| |
| keypath = stress_config.host_private_key_path |
| user = stress_config.host_admin_user |
| logdir = stress_config.nova_logdir |
| host = urlparse(manager.config.identity.uri).hostname |
| computes = _get_compute_nodes(keypath, user, host) |
| stress.utils.execute_on_all(keypath, user, computes, |
| "rm -f %s/*.log" % logdir) |
| random.seed(seed) |
| cases = _create_cases(choice_spec) |
| state = ClusterState(max_vms=max_vms) |
| create_initial_keypairs(manager, state, |
| int(kwargs.get('initial_keypairs', 0))) |
| create_initial_vms(manager, state, |
| int(kwargs.get('initial_vms', 0))) |
| create_initial_floating_ips(manager, state, |
| int(kwargs.get('initial_floating_ips', 0))) |
| create_initial_volumes(manager, state, |
| int(kwargs.get('initial_volumes', 0))) |
| test_end_time = time.time() + duration.seconds |
| |
| retry_list = [] |
| last_retry = time.time() |
| cooldown = False |
| logcheck_count = 0 |
| test_succeeded = True |
| logging.debug('=== Test \"%s\" on %s ===' % |
| (test_name, time.asctime(time.localtime()))) |
| for kw in kwargs: |
| logging.debug('\t%s = %s', kw, kwargs[kw]) |
| |
| while True: |
| if not cooldown: |
| if time.time() < test_end_time: |
| case = random.choice(cases) |
| logging.debug('Chose %s' % case) |
| retry = case.invoke(manager, state) |
| if retry is not None: |
| retry_list.append(retry) |
| else: |
| logging.info('Cooling down...') |
| cooldown = True |
| if cooldown and len(retry_list) == 0: |
| if _error_in_logs(keypath, logdir, user, computes): |
| test_succeeded = False |
| break |
| # Retry verifications every 5 seconds. |
| if time.time() - last_retry > 5: |
| logging.debug('retry verifications for %d tasks', len(retry_list)) |
| new_retry_list = [] |
| for v in retry_list: |
| v.check_timeout() |
| if not v.retry(): |
| new_retry_list.append(v) |
| retry_list = new_retry_list |
| last_retry = time.time() |
| time.sleep(sleep_time) |
| # Check error logs after 100 actions |
| if logcheck_count > 100: |
| if _error_in_logs(keypath, logdir, user, computes): |
| test_succeeded = False |
| break |
| else: |
| logcheck_count = 0 |
| else: |
| logcheck_count = logcheck_count + 1 |
| # Cleanup |
| logging.info('Cleaning up: terminating virtual machines...') |
| vms = state.get_instances() |
| active_vms = [v for _k, v in vms.iteritems() |
| if v and v[1] != 'TERMINATING'] |
| for target in active_vms: |
| manager.servers_client.delete_server(target[0]['id']) |
| # check to see that the server was actually killed |
| for target in active_vms: |
| kill_id = target[0]['id'] |
| i = 0 |
| while True: |
| try: |
| manager.servers_client.get_server(kill_id) |
| except Exception: |
| break |
| i += 1 |
| if i > 60: |
| _error_in_logs(keypath, logdir, user, computes) |
| raise Exception("Cleanup timed out") |
| time.sleep(1) |
| logging.info('killed %s' % kill_id) |
| state.delete_instance_state(kill_id) |
| for floating_ip_state in state.get_floating_ips(): |
| manager.floating_ips_client.delete_floating_ip( |
| floating_ip_state.resource_id) |
| for keypair_state in state.get_keypairs(): |
| manager.keypairs_client.delete_keypair(keypair_state.name) |
| for volume_state in state.get_volumes(): |
| manager.volumes_client.delete_volume(volume_state.resource_id) |
| |
| if test_succeeded: |
| logging.info('*** Test succeeded ***') |
| else: |
| logging.info('*** Test had errors ***') |
| return test_succeeded |