| # Copyright 2011 Quanta Research Cambridge, Inc. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """The entry point for the execution of a workloadTo execute a workload. | |
| Users pass in a description of the workload and a nova manager object | |
| to the bash_openstack function call""" | |
| import random | |
| import datetime | |
| import time | |
| # local imports | |
| from test_case import * | |
| from state import State | |
| import utils.util | |
| from config import StressConfig | |
| # setup logging to file | |
| logging.basicConfig( | |
| format='%(asctime)s %(name)-20s %(levelname)-8s %(message)s', | |
| datefmt='%m-%d %H:%M:%S', | |
| filename="stress.debug.log", | |
| filemode="w", | |
| level=logging.DEBUG, | |
| ) | |
| # define a Handler which writes INFO messages or higher to the sys.stdout | |
| _console = logging.StreamHandler() | |
| _console.setLevel(logging.INFO) | |
| # set a format which is simpler for console use | |
| _formatter = logging.Formatter('%(name)-20s: %(levelname)-8s %(message)s') | |
| # tell the handler to use this format | |
| _console.setFormatter(_formatter) | |
| # add the handler to the root logger | |
| logging.getLogger('').addHandler(_console) | |
| def _create_cases(choice_spec): | |
| """ | |
| Generate a workload of tests from workload description | |
| """ | |
| cases = [] | |
| count = 0 | |
| for choice in choice_spec: | |
| p = choice.probability | |
| for i in range(p): | |
| cases.append(choice) | |
| i = i + p | |
| count = count + p | |
| assert(count == 100) | |
| return cases | |
| def _get_compute_nodes(keypath, user, controller): | |
| """ | |
| Returns a list of active compute nodes. List is generated by running | |
| nova-manage on the controller. | |
| """ | |
| nodes = [] | |
| if keypath == None or user == None: | |
| return nodes | |
| lines = utils.util.ssh(keypath, user, controller, | |
| "nova-manage service list | grep ^nova-compute").\ | |
| split('\n') | |
| # For example: nova-compute xg11eth0 nova enabled :-) 2011-10-31 18:57:46 | |
| # This is fragile but there is, at present, no other way to get this info. | |
| for line in lines: | |
| words = line.split() | |
| if len(words) > 0 and words[4] == ":-)": | |
| nodes.append(words[1]) | |
| return nodes | |
| def _error_in_logs(keypath, logdir, user, nodes): | |
| """ | |
| Detect errors in the nova log files on the controller and compute nodes. | |
| """ | |
| grep = 'egrep "ERROR\|TRACE" %s/*.log' % logdir | |
| for node in nodes: | |
| errors = utils.util.ssh(keypath, user, node, grep, check=False) | |
| if len(errors) > 0: | |
| logging.error('%s: %s' % (node, errors)) | |
| return True | |
| return False | |
| def bash_openstack(manager, | |
| choice_spec, | |
| **kwargs): | |
| """ | |
| Workload driver. Executes a workload as specified by the `choice_spec` | |
| parameter against a nova-cluster. | |
| `manager` : Manager object | |
| `choice_spec` : list of BasherChoice actions to run on the cluster | |
| `kargs` : keyword arguments to the constructor of `test_case` | |
| `duration` = how long this test should last (3 sec) | |
| `sleep_time` = time to sleep between actions (in msec) | |
| `test_name` = human readable workload description | |
| (default: unnamed test) | |
| `max_vms` = maximum number of instances to launch | |
| (default: 32) | |
| `seed` = random seed (default: None) | |
| """ | |
| stress_config = StressConfig(manager.config._conf) | |
| # get keyword arguments | |
| duration = kwargs.get('duration', datetime.timedelta(seconds=10)) | |
| seed = kwargs.get('seed', None) | |
| sleep_time = float(kwargs.get('sleep_time', 3000)) / 1000 | |
| max_vms = int(kwargs.get('max_vms', stress_config.max_instances)) | |
| test_name = kwargs.get('test_name', 'unamed test') | |
| keypath = stress_config.host_private_key_path | |
| user = stress_config.host_admin_user | |
| logdir = stress_config.nova_logdir | |
| computes = _get_compute_nodes(keypath, user, manager.config.identity.host) | |
| utils.util.execute_on_all(keypath, user, computes, | |
| "rm -f %s/*.log" % logdir) | |
| random.seed(seed) | |
| cases = _create_cases(choice_spec) | |
| test_end_time = time.time() + duration.seconds | |
| state = State(max_vms=max_vms) | |
| retry_list = [] | |
| last_retry = time.time() | |
| cooldown = False | |
| logcheck_count = 0 | |
| test_succeeded = True | |
| logging.debug('=== Test \"%s\" on %s ===' % | |
| (test_name, time.asctime(time.localtime()))) | |
| for kw in kwargs: | |
| logging.debug('\t%s = %s', kw, kwargs[kw]) | |
| while True: | |
| if not cooldown: | |
| if time.time() < test_end_time: | |
| case = random.choice(cases) | |
| logging.debug('Chose %s' % case) | |
| retry = case.invoke(manager, state) | |
| if retry != None: | |
| retry_list.append(retry) | |
| else: | |
| logging.info('Cooling down...') | |
| cooldown = True | |
| if cooldown and len(retry_list) == 0: | |
| if _error_in_logs(keypath, logdir, user, computes): | |
| test_succeeded = False | |
| break | |
| # Retry verifications every 5 seconds. | |
| if time.time() - last_retry > 5: | |
| logging.debug('retry verifications for %d tasks', len(retry_list)) | |
| new_retry_list = [] | |
| for v in retry_list: | |
| if not v.retry(): | |
| new_retry_list.append(v) | |
| retry_list = new_retry_list | |
| last_retry = time.time() | |
| time.sleep(sleep_time) | |
| # Check error logs after 100 actions | |
| if logcheck_count > 100: | |
| if _error_in_logs(keypath, logdir, user, computes): | |
| test_succeeded = False | |
| break | |
| else: | |
| logcheck_count = 0 | |
| else: | |
| logcheck_count = logcheck_count + 1 | |
| # Cleanup | |
| logging.info('Cleaning up: terminating virtual machines...') | |
| vms = state.get_instances() | |
| active_vms = [v for _k, v in vms.iteritems() if v and v[1] == 'ACTIVE'] | |
| for target in active_vms: | |
| manager.servers_client.delete_server(target[0]['id']) | |
| # check to see that the server was actually killed | |
| for target in active_vms: | |
| kill_id = target[0]['id'] | |
| i = 0 | |
| while True: | |
| try: | |
| manager.servers_client.get_server(kill_id) | |
| except Exception: | |
| break | |
| i += 1 | |
| if i > 60: | |
| _error_in_logs(keypath, logdir, user, computes) | |
| raise Exception("Cleanup timed out") | |
| time.sleep(1) | |
| logging.info('killed %s' % kill_id) | |
| state.delete_instance_state(kill_id) | |
| if test_succeeded: | |
| logging.info('*** Test succeeded ***') | |
| else: | |
| logging.info('*** Test had errors ***') | |
| return test_succeeded |