lot of fixes
diff --git a/run_test.py b/run_test.py
index 4e3f6ef..b424b76 100755
--- a/run_test.py
+++ b/run_test.py
@@ -54,8 +54,11 @@
def format(self, record):
levelname = record.levelname
+ prn_name = ' ' * (6 - len(levelname)) + levelname
if levelname in self.colors:
- record.levelname = self.colors[levelname](levelname)
+ record.levelname = self.colors[levelname](prn_name)
+ else:
+ record.levelname = prn_name
return logging.Formatter.format(self, record)
@@ -65,7 +68,7 @@
sh = logging.StreamHandler()
sh.setLevel(level)
- log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+ log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
colored_formatter = ColoredFormatter(log_format,
"%H:%M:%S")
@@ -80,6 +83,10 @@
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
+ logger_api = logging.getLogger("io-perf-tool.fuel_api")
+ logger_api.addHandler(sh)
+ logger_api.setLevel(logging.WARNING)
+
def format_result(res, formatter):
data = "\n{0}\n".format("=" * 80)
@@ -140,7 +147,7 @@
res_q.put(exc)
-def run_tests(config, nodes):
+def run_tests(test_block, nodes):
tool_type_mapper = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
@@ -151,64 +158,43 @@
res_q = Queue.Queue()
- for test_block in config:
- for name, params in test_block.items():
- logger.info("Starting {0} tests".format(name))
+ for name, params in test_block.items():
+ logger.info("Starting {0} tests".format(name))
- threads = []
- barrier = utils.Barrier(len(test_nodes))
- for node in test_nodes:
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
- test = tool_type_mapper[name](params, res_q.put)
- th = threading.Thread(None, test_thread, None,
- (test, node, barrier, res_q))
- threads.append(th)
- th.daemon = True
- th.start()
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier, res_q))
+ threads.append(th)
+ th.daemon = True
+ th.start()
- def gather_results(res_q, results):
- while not res_q.empty():
- val = res_q.get()
+ def gather_results(res_q, results):
+ while not res_q.empty():
+ val = res_q.get()
- if isinstance(val, Exception):
- msg = "Exception during test execution: {0}"
- raise ValueError(msg.format(val.message))
+ if isinstance(val, Exception):
+ msg = "Exception during test execution: {0}"
+ raise ValueError(msg.format(val.message))
- results.append(val)
+ results.append(val)
- results = []
+ results = []
- while True:
- for th in threads:
- th.join(1)
- gather_results(res_q, results)
+ while True:
+ for th in threads:
+ th.join(1)
+ gather_results(res_q, results)
- if all(not th.is_alive() for th in threads):
- break
+ if all(not th.is_alive() for th in threads):
+ break
- gather_results(res_q, results)
- yield name, test.merge_results(results)
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run disk io performance test")
-
- parser.add_argument("-l", dest='extra_logs',
- action='store_true', default=False,
- help="print some extra log info")
-
- parser.add_argument("-b", '--build_description',
- type=str, default="Build info")
- parser.add_argument("-i", '--build_id', type=str, default="id")
- parser.add_argument("-t", '--build_type', type=str, default="GA")
- parser.add_argument("-u", '--username', type=str, default="admin")
- parser.add_argument("-p", '--post-process-only', default=None)
- parser.add_argument("-o", '--output-dest', nargs="*")
- parser.add_argument("config_file", nargs="?", default="config.yaml")
-
- return parser.parse_args(argv[1:])
+ gather_results(res_q, results)
+ yield name, test.merge_results(results)
def log_nodes_statistic(_, ctx):
@@ -223,10 +209,6 @@
logger.debug("Found {0} nodes with role {1}".format(count, role))
-def log_sensors_config(cfg):
- pass
-
-
def connect_stage(cfg, ctx):
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
@@ -235,7 +217,7 @@
def discover_stage(cfg, ctx):
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
- ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
+ ctx.nodes.extend(discover.discover(ctx, discover_objs, cfg['clouds']))
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
@@ -258,8 +240,6 @@
if role in node.roles:
sens_cfg.append((node.connection, collect_cfg))
- log_sensors_config(sens_cfg)
-
ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
connected_config=sens_cfg)
@@ -278,53 +258,84 @@
ctx.sensor_data = ctx.sensors_control_queue.get()
-def run_all_test(cfg, ctx):
- ctx.results = []
+def get_os_credentials(cfg, ctx, creds_type):
+ creds = None
- if 'start_test_nodes' in cfg['tests']:
- params = cfg['tests']['start_test_nodes']['openstack']
- os_nodes_ids = []
-
- os_creds = params['creds']
-
- if os_creds == 'fuel':
- raise NotImplementedError()
-
- elif os_creds == 'clouds':
+ if creds_type == 'clouds':
+ if 'openstack' in cfg['clouds']:
os_cfg = cfg['clouds']['openstack']
+
tenant = os_cfg['OS_TENANT_NAME'].strip()
user = os_cfg['OS_USERNAME'].strip()
passwd = os_cfg['OS_PASSWORD'].strip()
auth_url = os_cfg['OS_AUTH_URL'].strip()
- elif os_creds == 'ENV':
- tenant = None
- user = None
- passwd = None
- auth_url = None
+ elif 'fuel' in cfg['clouds'] and \
+ 'openstack_env' in cfg['clouds']['fuel']:
+ creds = ctx.fuel_openstack_creds
- else:
- raise ValueError("Only 'ENV' creds are supported")
+ elif creds_type == 'ENV':
+ user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+ elif os.path.isfile(creds_type):
+ user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+ else:
+ msg = "Creds {0!r} isn't supported".format(creds_type)
+ raise ValueError(msg)
- start_vms.nova_connect(user, passwd, tenant, auth_url)
+ if creds is None:
+ creds = {'name': user,
+ 'passwd': passwd,
+ 'tenant': tenant,
+ 'auth_url': auth_url}
- logger.info("Preparing openstack")
- start_vms.prepare_os(user, passwd, tenant, auth_url)
+ return creds
- new_nodes = []
- for new_node, node_id in start_vms.launch_vms(params):
- new_node.roles.append('testnode')
- ctx.nodes.append(new_node)
- os_nodes_ids.append(node_id)
- new_nodes.append(new_node)
- store_nodes_in_log(cfg, os_nodes_ids)
- ctx.openstack_nodes_ids = os_nodes_ids
+def run_tests_stage(cfg, ctx):
+ ctx.results = []
- connect_all(new_nodes)
+ if 'tests' not in cfg:
+ return
- if 'tests' in cfg:
- ctx.results.extend(run_tests(cfg['tests'], ctx.nodes))
+ for group in cfg['tests']:
+
+ assert len(group.items()) == 1
+ key, config = group.items()[0]
+
+ if 'start_test_nodes' == key:
+ params = config['vm_params']
+ os_nodes_ids = []
+
+ os_creds_type = config['creds']
+ os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+ start_vms.nova_connect(**os_creds)
+
+ # logger.info("Preparing openstack")
+ # start_vms.prepare_os(**os_creds)
+
+ new_nodes = []
+ try:
+ for new_node, node_id in start_vms.launch_vms(params):
+ new_node.roles.append('testnode')
+ ctx.nodes.append(new_node)
+ os_nodes_ids.append(node_id)
+ new_nodes.append(new_node)
+
+ store_nodes_in_log(cfg, os_nodes_ids)
+ ctx.openstack_nodes_ids = os_nodes_ids
+
+ connect_all(new_nodes)
+
+ for test_group in config.get('tests', []):
+ ctx.results.extend(run_tests(test_group, ctx.nodes))
+
+ finally:
+ # shut_down_vms_stage(cfg, ctx)
+ pass
+
+ elif 'tests' in key:
+ ctx.results.extend(run_tests(config, ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -353,13 +364,6 @@
shut_down_vms_stage(cfg, ctx)
-def run_tests_stage(cfg, ctx):
- # clear nodes that possible were created on previous test running
- # clear_enviroment(cfg, ctx) << fix OS connection
- ctx.clear_calls_stack.append(shut_down_vms_stage)
- run_all_test(cfg, ctx)
-
-
def disconnect_stage(cfg, ctx):
ssh_utils.close_all_sessions()
@@ -433,11 +437,30 @@
def load_data_from(var_dir):
def load_data_from_file(cfg, ctx):
raw_results = os.path.join(var_dir, 'raw_results.yaml')
- print "load data from", raw_results
ctx.results = yaml.load(open(raw_results).read())
return load_data_from_file
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run disk io performance test")
+
+ parser.add_argument("-l", dest='extra_logs',
+ action='store_true', default=False,
+ help="print some extra log info")
+
+ parser.add_argument("-b", '--build_description',
+ type=str, default="Build info")
+ parser.add_argument("-i", '--build_id', type=str, default="id")
+ parser.add_argument("-t", '--build_type', type=str, default="GA")
+ parser.add_argument("-u", '--username', type=str, default="admin")
+ parser.add_argument("-p", '--post-process-only', default=None)
+ parser.add_argument("-o", '--output-dest', nargs="*")
+ parser.add_argument("config_file", nargs="?", default="config.yaml")
+
+ return parser.parse_args(argv[1:])
+
+
def main(argv):
opts = parse_args(argv)
@@ -459,7 +482,7 @@
report_stage
]
- load_config(opts.config_file)
+ load_config(opts.config_file, opts.post_process_only)
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level, cfg_dict['log_file'])
@@ -489,7 +512,7 @@
if exc is not None:
raise exc, cls, tb
- logger.info("All info stotored into {0}".format(cfg_dict['var_dir']))
+ logger.info("All info stored into {0}".format(cfg_dict['var_dir']))
return 0