fixing, improve sersors installation code
diff --git a/wally/run_test.py b/wally/run_test.py
index 7899bae..42e96ff 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,25 +2,24 @@
import os
import sys
-import time
import Queue
import pprint
import logging
import argparse
+import functools
import threading
+import contextlib
import collections
import yaml
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
+from wally.tests_sensors import deploy_sensors_stage
from wally.discover import discover, Node, undiscover
from wally import utils, report, ssh_utils, start_vms
from wally.suits.itest import IOPerfTest, PgBenchTest
from wally.config import cfg_dict, load_config, setup_loggers
-from wally.sensors.api import (start_monitoring,
- deploy_and_start_sensors,
- SensorConfig)
logger = logging.getLogger("wally")
@@ -40,54 +39,37 @@
self.nodes = []
self.clear_calls_stack = []
self.openstack_nodes_ids = []
- self.sensor_cm = None
- self.keep_vm = False
- self.sensors_control_queue = None
- self.sensor_listen_thread = None
+ self.sensors_mon_q = None
-def connect_one(node):
+def connect_one(node, vm=False):
try:
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
url = node.conn_url[len(ssh_pref):]
- logger.debug("Try connect to " + url)
- node.connection = ssh_utils.connect(url)
+
+ if vm:
+ ret_count = 24
+ log_warns = False
+ else:
+ ret_count = 3
+ log_warns = True
+
+ node.connection = ssh_utils.connect(url,
+ retry_count=ret_count,
+ log_warns=log_warns)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
- logger.exception("During connect to {0}".format(node))
- raise
+ logger.exception("During connect to " + node.get_conn_id())
+ node.connection = None
-def connect_all(nodes):
+def connect_all(nodes, vm=False):
logger.info("Connecting to nodes")
with ThreadPoolExecutor(32) as pool:
- list(pool.map(connect_one, nodes))
- logger.info("All nodes connected successfully")
-
-
-def save_sensors_data(q, mon_q, fd):
- logger.info("Start receiving sensors data")
- fd.write("\n")
-
- observed_nodes = set()
-
- try:
- while True:
- val = q.get()
- if val is None:
- break
-
- addr, data = val
- if addr not in observed_nodes:
- mon_q.put(addr)
- observed_nodes.add(addr)
-
- fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
- except Exception:
- logger.exception("Error in sensors thread")
- logger.info("Sensors thread exits")
+ connect_one_f = functools.partial(connect_one, vm=vm)
+ list(pool.map(connect_one_f, nodes))
def test_thread(test, node, barrier, res_q):
@@ -185,11 +167,23 @@
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
+ all_ok = True
-def make_undiscover_stage(clean_data):
- def undiscover_stage(cfg, ctx):
- undiscover(clean_data)
- return undiscover_stage
+ for node in ctx.nodes:
+ if node.connection is None:
+ if 'testnode' in node.roles:
+ msg = "Can't connect to testnode {0}"
+ raise RuntimeError(msg.format(node.get_conn_id()))
+ else:
+ msg = "Node {0} would be excluded - can't connect"
+ logger.warning(msg.format(node.get_conn_id()))
+ all_ok = False
+
+ if all_ok:
+ logger.info("All nodes connected successfully")
+
+ ctx.nodes = [node for node in ctx.nodes
+ if node.connection is not None]
def discover_stage(cfg, ctx):
@@ -198,102 +192,17 @@
nodes, clean_data = discover(ctx, discover_objs,
cfg['clouds'], cfg['var_dir'])
- ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+
+ def undiscover_stage(cfg, ctx):
+ undiscover(clean_data)
+
+ ctx.clear_calls_stack.append(undiscover_stage)
ctx.nodes.extend(nodes)
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
-def deploy_sensors_stage(cfg_dict, ctx):
- if 'sensors' not in cfg_dict:
- return
-
- cfg = cfg_dict.get('sensors')
-
- sensors_configs = []
- monitored_nodes = []
-
- for role, sensors_str in cfg["roles_mapping"].items():
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
-
- for node in ctx.nodes:
- if role in node.roles:
- monitored_nodes.append(node)
- sens_cfg = SensorConfig(node.connection,
- node.get_ip(),
- collect_cfg)
- sensors_configs.append(sens_cfg)
-
- if len(monitored_nodes) == 0:
- logger.info("Nothing to monitor, no sensors would be installed")
- return
-
- ctx.receiver_uri = cfg["receiver_uri"]
- nodes_ips = [node.get_ip() for node in monitored_nodes]
- if '{ip}' in ctx.receiver_uri:
- ips = set(map(utils.get_ip_for_target, nodes_ips))
-
- if len(ips) > 1:
- raise ValueError("Can't select external ip for sensors server")
-
- if len(ips) == 0:
- raise ValueError("Can't find any external ip for sensors server")
-
- ext_ip = list(ips)[0]
- ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
-
- ctx.clear_calls_stack.append(remove_sensors_stage)
- ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
-
- ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
-
- mon_q = Queue.Queue()
-
- fd = open(cfg_dict['sensor_storage'], "w")
- th = threading.Thread(None, save_sensors_data, None,
- (ctx.sensors_control_queue, mon_q, fd))
- th.daemon = True
- th.start()
- ctx.sensor_listen_thread = th
-
- nodes_ips_set = set(nodes_ips)
- MAX_WAIT_FOR_SENSORS = 10
- etime = time.time() + MAX_WAIT_FOR_SENSORS
-
- msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
- logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
-
- # wait till all nodes start sending data
- while len(nodes_ips_set) != 0:
- tleft = etime - time.time()
- try:
- data = mon_q.get(True, tleft)
- ip, port = data
- except Queue.Empty:
- msg = "Node {0} not sending any sensor data in {1}s"
- msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
- raise RuntimeError(msg)
-
- if ip not in nodes_ips_set:
- logger.warning("Receive sensors from extra node: {0}".format(ip))
-
- nodes_ips_set.remove(ip)
-
-
-def remove_sensors_stage(cfg, ctx):
- if ctx.sensor_cm is not None:
- ctx.sensor_cm.__exit__(None, None, None)
-
- if ctx.sensors_control_queue is not None:
- ctx.sensors_control_queue.put(None)
-
- if ctx.sensor_listen_thread is not None:
- ctx.sensor_listen_thread.join()
-
-
def get_os_credentials(cfg, ctx, creds_type):
creds = None
@@ -328,6 +237,38 @@
return creds
+@contextlib.contextmanager
+def create_vms_ctx(ctx, cfg, config):
+ params = config['vm_params'].copy()
+ os_nodes_ids = []
+
+ os_creds_type = config['creds']
+ os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+ start_vms.nova_connect(**os_creds)
+
+ logger.info("Preparing openstack")
+ start_vms.prepare_os_subpr(**os_creds)
+
+ new_nodes = []
+ try:
+ params['group_name'] = cfg_dict['run_uuid']
+ for new_node, node_id in start_vms.launch_vms(params):
+ new_node.roles.append('testnode')
+ ctx.nodes.append(new_node)
+ os_nodes_ids.append(node_id)
+ new_nodes.append(new_node)
+
+ store_nodes_in_log(cfg, os_nodes_ids)
+ ctx.openstack_nodes_ids = os_nodes_ids
+
+ yield new_nodes
+
+ finally:
+ if not cfg['keep_vm']:
+ shut_down_vms_stage(cfg, ctx)
+
+
def run_tests_stage(cfg, ctx):
ctx.results = []
@@ -340,54 +281,21 @@
key, config = group.items()[0]
if 'start_test_nodes' == key:
- params = config['vm_params'].copy()
- os_nodes_ids = []
+ with create_vms_ctx(ctx, cfg, config) as new_nodes:
+ connect_all(new_nodes, True)
- os_creds_type = config['creds']
- os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+ for node in new_nodes:
+ if node.connection is None:
+ msg = "Failed to connect to vm {0}"
+ raise RuntimeError(msg.format(node.get_conn_id()))
- start_vms.nova_connect(**os_creds)
-
- logger.info("Preparing openstack")
- start_vms.prepare_os_subpr(**os_creds)
-
- new_nodes = []
- try:
- params['group_name'] = cfg_dict['run_uuid']
- for new_node, node_id in start_vms.launch_vms(params):
- new_node.roles.append('testnode')
- ctx.nodes.append(new_node)
- os_nodes_ids.append(node_id)
- new_nodes.append(new_node)
-
- store_nodes_in_log(cfg, os_nodes_ids)
- ctx.openstack_nodes_ids = os_nodes_ids
-
- connect_all(new_nodes)
-
- # deploy sensors on new nodes
- # unify this code
- if 'sensors' in cfg:
- sens_cfg = []
- sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
- for node in new_nodes:
- sens_cfg.append((node.connection, collect_cfg))
-
- uri = cfg["sensors"]["receiver_uri"]
- logger.debug("Installing sensors on vm's")
- deploy_and_start_sensors(uri, None,
- connected_config=sens_cfg)
+ deploy_sensors_stage(cfg_dict,
+ ctx,
+ nodes=new_nodes,
+ undeploy=False)
for test_group in config.get('tests', []):
ctx.results.extend(run_tests(test_group, ctx.nodes))
-
- finally:
- if not ctx.keep_vm:
- shut_down_vms_stage(cfg, ctx)
-
else:
ctx.results.extend(run_tests(group, ctx.nodes))
@@ -426,22 +334,6 @@
node.connection.close()
-def yamable(data):
- if isinstance(data, (tuple, list)):
- return map(yamable, data)
-
- if isinstance(data, unicode):
- return str(data)
-
- if isinstance(data, dict):
- res = {}
- for k, v in data.items():
- res[yamable(k)] = yamable(v)
- return res
-
- return data
-
-
def store_raw_results_stage(cfg, ctx):
raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
@@ -451,7 +343,7 @@
else:
cont = []
- cont.extend(yamable(ctx.results))
+ cont.extend(utils.yamable(ctx.results))
raw_data = pretty_yaml.dumps(cont)
with open(raw_results, "w") as fd:
@@ -564,7 +456,7 @@
ctx.build_meta['build_descrption'] = opts.build_description
ctx.build_meta['build_type'] = opts.build_type
ctx.build_meta['username'] = opts.username
- ctx.keep_vm = opts.keep_vm
+ cfg_dict['keep_vm'] = opts.keep_vm
try:
for stage in stages: