fixing, improve sersors installation code
diff --git a/local_ceph.yaml b/local_ceph.yaml
index e945a06..fc54189 100644
--- a/local_ceph.yaml
+++ b/local_ceph.yaml
@@ -10,7 +10,7 @@
var_dir_root: /tmp/perf_tests
sensors:
- receiver_uri: "udp://{ip}:5699"
+ receiver_url: "udp://{ip}:5699"
roles_mapping:
ceph-osd: block-io
cinder: block-io, system-cpu
diff --git a/perf1.yaml b/perf1.yaml
index ecfd26b..fed57f5 100644
--- a/perf1.yaml
+++ b/perf1.yaml
@@ -1,6 +1,6 @@
clouds:
fuel:
- url: http://172.6.52.112:8000/
+ url: http://172.16.52.112:8000/
creds: admin:admin@admin
ssh_creds: root:test37
openstack_env: test
@@ -14,8 +14,7 @@
var_dir_root: /tmp/perf_tests
sensors:
- # receiver_uri: udp://172.18.217.10:5699
- receiver_uri: "udp://{ip}:5699"
+ receiver_url: "udp://{ip}:5699"
roles_mapping:
ceph-osd: block-io
cinder: block-io, system-cpu
@@ -41,14 +40,14 @@
flt_ip_pool: nova
private_key_path: disk_io_perf.pem
creds: "ssh://ubuntu@{ip}::{private_key_path}"
- name_templ: disk_io_perf-{0}
- scheduler_group_name: disk_io_group_aa-{0}
+ name_templ: wally-{group}-{id}
+ scheduler_group_name: wally-{group}-{id}
security_group: disk_io_perf
tests:
- io:
- cfg: tests/io_scenario_hdd.cfg
- # cfg: scripts/fio_tests_configs/io_task_test.cfg
+ # cfg: tests/io_scenario_hdd.cfg
+ cfg: scripts/fio_tests_configs/io_task_test.cfg
params:
FILENAME: /opt/xxx.bin
NUM_ROUNDS: 7
diff --git a/usb_hdd.yaml b/usb_hdd.yaml
index 59e1259..224c6f6 100644
--- a/usb_hdd.yaml
+++ b/usb_hdd.yaml
@@ -6,9 +6,7 @@
var_dir_root: /tmp/perf_tests
sensors:
- # receiver_uri: udp://172.18.217.10:5699
- receiver_uri: "udp://192.168.0.108:5699"
- # receiver_uri: "udp://{ip}:5699"
+ receiver_url: "udp://{ip}:5699"
roles_mapping:
ceph-osd: block-io
cinder: block-io, system-cpu
diff --git a/vEnv-3-2.yaml b/vEnv-3-2.yaml
index 40ba677..385013a 100644
--- a/vEnv-3-2.yaml
+++ b/vEnv-3-2.yaml
@@ -13,13 +13,12 @@
internal:
var_dir_root: /tmp/perf_tests
-sensors:
- # receiver_uri: udp://172.18.217.10:5699
- receiver_uri: "udp://{ip}:5699"
- roles_mapping:
- ceph-osd: block-io
- cinder: block-io, system-cpu
- testnode: system-cpu, block-io
+# sensors:
+# receiver_url: "udp://{ip}:5699"
+# roles_mapping:
+# ceph-osd: block-io
+# cinder: block-io, system-cpu
+# testnode: system-cpu, block-io
tests:
- start_test_nodes:
diff --git a/wally/config.py b/wally/config.py
index 03b7ac9..7dba134 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -108,7 +108,7 @@
sh = logging.StreamHandler()
sh.setLevel(def_level)
- log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+ log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
sh.setFormatter(colored_formatter)
@@ -118,7 +118,7 @@
if log_fname is not None:
fh = logging.FileHandler(log_fname)
- log_format = '%(asctime)s - %(levelname)8s - %(name)s - %(message)s'
+ log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 8c78387..3cd5cff 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -1,3 +1,4 @@
+import os.path
import logging
from . import ceph
@@ -9,6 +10,23 @@
logger = logging.getLogger("wally.discover")
+openrc_templ = """#!/bin/sh
+export LC_ALL=C
+export OS_NO_CACHE='true'
+export OS_TENANT_NAME='{tenant}'
+export OS_USERNAME='{name}'
+export OS_PASSWORD='{passwd}'
+export OS_AUTH_URL='{auth_url}'
+export OS_AUTH_STRATEGY='keystone'
+export OS_REGION_NAME='RegionOne'
+export CINDER_ENDPOINT_TYPE='publicURL'
+export GLANCE_ENDPOINT_TYPE='publicURL'
+export KEYSTONE_ENDPOINT_TYPE='publicURL'
+export NOVA_ENDPOINT_TYPE='publicURL'
+export NEUTRON_ENDPOINT_TYPE='publicURL'
+"""
+
+
def discover(ctx, discover, clusters_info, var_dir):
nodes_to_run = []
clean_data = None
@@ -38,7 +56,6 @@
nodes_to_run.extend(os_nodes)
elif cluster == "fuel":
-
res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
nodes, clean_data, openrc_dict = res
@@ -47,6 +64,17 @@
'tenant': openrc_dict['tenant_name'],
'auth_url': openrc_dict['os_auth_url']}
+ env_name = clusters_info['fuel']['openstack_env']
+ env_f_name = env_name
+ for char in "-+ {}()[]":
+ env_f_name = env_f_name.replace(char, '_')
+
+ fuel_openrc_fname = os.path.join(var_dir,
+ env_f_name + "_openrc")
+ with open(fuel_openrc_fname, "w") as fd:
+ fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
+ msg = "Openrc for cluster {0} saves into {1}"
+ logger.debug(msg.format(env_name, fuel_openrc_fname))
nodes_to_run.extend(nodes)
elif cluster == "ceph":
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 6e76188..149ec31 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -15,7 +15,7 @@
logger = logging.getLogger("wally.discover")
-BASE_PF_PORT = 33467
+BASE_PF_PORT = 44006
def discover_fuel_nodes(fuel_data, var_dir):
@@ -32,7 +32,7 @@
fuel_nodes = list(cluster.get_nodes())
- logger.debug("Found FUEL {0}".format("".join(map(str, version))))
+ logger.debug("Found FUEL {0}".format(".".join(map(str, version))))
network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
@@ -60,15 +60,17 @@
conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
port,
fuel_key_file)
- nodes.append(Node(conn_url, fuel_node['roles']))
+ node = Node(conn_url, fuel_node['roles'])
+ node.monitor_url = None
+ nodes.append(node)
ips_ports.append((ip, port))
logger.debug("Found %s fuel nodes for env %r" %
(len(nodes), fuel_data['openstack_env']))
- return ([],
- (ssh_conn, fuel_ext_iface, ips_ports),
- cluster.get_openrc())
+ # return ([],
+ # (ssh_conn, fuel_ext_iface, ips_ports),
+ # cluster.get_openrc())
return (nodes,
(ssh_conn, fuel_ext_iface, ips_ports),
diff --git a/wally/discover/node.py b/wally/discover/node.py
index fad4f29..dc1c9b0 100644
--- a/wally/discover/node.py
+++ b/wally/discover/node.py
@@ -7,10 +7,20 @@
self.roles = roles
self.conn_url = conn_url
self.connection = None
+ self.monitor_url = None
def get_ip(self):
return urlparse.urlparse(self.conn_url).hostname
+ def get_conn_id(self):
+ host = urlparse.urlparse(self.conn_url).hostname
+ port = urlparse.urlparse(self.conn_url).port
+
+ if port is None:
+ port = 22
+
+ return host + ":" + str(port)
+
def __str__(self):
templ = "<Node: url={conn_url!r} roles={roles}" + \
" connected={is_connected}>"
diff --git a/wally/run_test.py b/wally/run_test.py
index 7899bae..42e96ff 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,25 +2,24 @@
import os
import sys
-import time
import Queue
import pprint
import logging
import argparse
+import functools
import threading
+import contextlib
import collections
import yaml
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
+from wally.tests_sensors import deploy_sensors_stage
from wally.discover import discover, Node, undiscover
from wally import utils, report, ssh_utils, start_vms
from wally.suits.itest import IOPerfTest, PgBenchTest
from wally.config import cfg_dict, load_config, setup_loggers
-from wally.sensors.api import (start_monitoring,
- deploy_and_start_sensors,
- SensorConfig)
logger = logging.getLogger("wally")
@@ -40,54 +39,37 @@
self.nodes = []
self.clear_calls_stack = []
self.openstack_nodes_ids = []
- self.sensor_cm = None
- self.keep_vm = False
- self.sensors_control_queue = None
- self.sensor_listen_thread = None
+ self.sensors_mon_q = None
-def connect_one(node):
+def connect_one(node, vm=False):
try:
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
url = node.conn_url[len(ssh_pref):]
- logger.debug("Try connect to " + url)
- node.connection = ssh_utils.connect(url)
+
+ if vm:
+ ret_count = 24
+ log_warns = False
+ else:
+ ret_count = 3
+ log_warns = True
+
+ node.connection = ssh_utils.connect(url,
+ retry_count=ret_count,
+ log_warns=log_warns)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
- logger.exception("During connect to {0}".format(node))
- raise
+ logger.exception("During connect to " + node.get_conn_id())
+ node.connection = None
-def connect_all(nodes):
+def connect_all(nodes, vm=False):
logger.info("Connecting to nodes")
with ThreadPoolExecutor(32) as pool:
- list(pool.map(connect_one, nodes))
- logger.info("All nodes connected successfully")
-
-
-def save_sensors_data(q, mon_q, fd):
- logger.info("Start receiving sensors data")
- fd.write("\n")
-
- observed_nodes = set()
-
- try:
- while True:
- val = q.get()
- if val is None:
- break
-
- addr, data = val
- if addr not in observed_nodes:
- mon_q.put(addr)
- observed_nodes.add(addr)
-
- fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
- except Exception:
- logger.exception("Error in sensors thread")
- logger.info("Sensors thread exits")
+ connect_one_f = functools.partial(connect_one, vm=vm)
+ list(pool.map(connect_one_f, nodes))
def test_thread(test, node, barrier, res_q):
@@ -185,11 +167,23 @@
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
+ all_ok = True
-def make_undiscover_stage(clean_data):
- def undiscover_stage(cfg, ctx):
- undiscover(clean_data)
- return undiscover_stage
+ for node in ctx.nodes:
+ if node.connection is None:
+ if 'testnode' in node.roles:
+ msg = "Can't connect to testnode {0}"
+ raise RuntimeError(msg.format(node.get_conn_id()))
+ else:
+ msg = "Node {0} would be excluded - can't connect"
+ logger.warning(msg.format(node.get_conn_id()))
+ all_ok = False
+
+ if all_ok:
+ logger.info("All nodes connected successfully")
+
+ ctx.nodes = [node for node in ctx.nodes
+ if node.connection is not None]
def discover_stage(cfg, ctx):
@@ -198,102 +192,17 @@
nodes, clean_data = discover(ctx, discover_objs,
cfg['clouds'], cfg['var_dir'])
- ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+
+ def undiscover_stage(cfg, ctx):
+ undiscover(clean_data)
+
+ ctx.clear_calls_stack.append(undiscover_stage)
ctx.nodes.extend(nodes)
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
-def deploy_sensors_stage(cfg_dict, ctx):
- if 'sensors' not in cfg_dict:
- return
-
- cfg = cfg_dict.get('sensors')
-
- sensors_configs = []
- monitored_nodes = []
-
- for role, sensors_str in cfg["roles_mapping"].items():
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
-
- for node in ctx.nodes:
- if role in node.roles:
- monitored_nodes.append(node)
- sens_cfg = SensorConfig(node.connection,
- node.get_ip(),
- collect_cfg)
- sensors_configs.append(sens_cfg)
-
- if len(monitored_nodes) == 0:
- logger.info("Nothing to monitor, no sensors would be installed")
- return
-
- ctx.receiver_uri = cfg["receiver_uri"]
- nodes_ips = [node.get_ip() for node in monitored_nodes]
- if '{ip}' in ctx.receiver_uri:
- ips = set(map(utils.get_ip_for_target, nodes_ips))
-
- if len(ips) > 1:
- raise ValueError("Can't select external ip for sensors server")
-
- if len(ips) == 0:
- raise ValueError("Can't find any external ip for sensors server")
-
- ext_ip = list(ips)[0]
- ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
-
- ctx.clear_calls_stack.append(remove_sensors_stage)
- ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
-
- ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
-
- mon_q = Queue.Queue()
-
- fd = open(cfg_dict['sensor_storage'], "w")
- th = threading.Thread(None, save_sensors_data, None,
- (ctx.sensors_control_queue, mon_q, fd))
- th.daemon = True
- th.start()
- ctx.sensor_listen_thread = th
-
- nodes_ips_set = set(nodes_ips)
- MAX_WAIT_FOR_SENSORS = 10
- etime = time.time() + MAX_WAIT_FOR_SENSORS
-
- msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
- logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
-
- # wait till all nodes start sending data
- while len(nodes_ips_set) != 0:
- tleft = etime - time.time()
- try:
- data = mon_q.get(True, tleft)
- ip, port = data
- except Queue.Empty:
- msg = "Node {0} not sending any sensor data in {1}s"
- msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
- raise RuntimeError(msg)
-
- if ip not in nodes_ips_set:
- logger.warning("Receive sensors from extra node: {0}".format(ip))
-
- nodes_ips_set.remove(ip)
-
-
-def remove_sensors_stage(cfg, ctx):
- if ctx.sensor_cm is not None:
- ctx.sensor_cm.__exit__(None, None, None)
-
- if ctx.sensors_control_queue is not None:
- ctx.sensors_control_queue.put(None)
-
- if ctx.sensor_listen_thread is not None:
- ctx.sensor_listen_thread.join()
-
-
def get_os_credentials(cfg, ctx, creds_type):
creds = None
@@ -328,6 +237,38 @@
return creds
+@contextlib.contextmanager
+def create_vms_ctx(ctx, cfg, config):
+ params = config['vm_params'].copy()
+ os_nodes_ids = []
+
+ os_creds_type = config['creds']
+ os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+ start_vms.nova_connect(**os_creds)
+
+ logger.info("Preparing openstack")
+ start_vms.prepare_os_subpr(**os_creds)
+
+ new_nodes = []
+ try:
+ params['group_name'] = cfg_dict['run_uuid']
+ for new_node, node_id in start_vms.launch_vms(params):
+ new_node.roles.append('testnode')
+ ctx.nodes.append(new_node)
+ os_nodes_ids.append(node_id)
+ new_nodes.append(new_node)
+
+ store_nodes_in_log(cfg, os_nodes_ids)
+ ctx.openstack_nodes_ids = os_nodes_ids
+
+ yield new_nodes
+
+ finally:
+ if not cfg['keep_vm']:
+ shut_down_vms_stage(cfg, ctx)
+
+
def run_tests_stage(cfg, ctx):
ctx.results = []
@@ -340,54 +281,21 @@
key, config = group.items()[0]
if 'start_test_nodes' == key:
- params = config['vm_params'].copy()
- os_nodes_ids = []
+ with create_vms_ctx(ctx, cfg, config) as new_nodes:
+ connect_all(new_nodes, True)
- os_creds_type = config['creds']
- os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+ for node in new_nodes:
+ if node.connection is None:
+ msg = "Failed to connect to vm {0}"
+ raise RuntimeError(msg.format(node.get_conn_id()))
- start_vms.nova_connect(**os_creds)
-
- logger.info("Preparing openstack")
- start_vms.prepare_os_subpr(**os_creds)
-
- new_nodes = []
- try:
- params['group_name'] = cfg_dict['run_uuid']
- for new_node, node_id in start_vms.launch_vms(params):
- new_node.roles.append('testnode')
- ctx.nodes.append(new_node)
- os_nodes_ids.append(node_id)
- new_nodes.append(new_node)
-
- store_nodes_in_log(cfg, os_nodes_ids)
- ctx.openstack_nodes_ids = os_nodes_ids
-
- connect_all(new_nodes)
-
- # deploy sensors on new nodes
- # unify this code
- if 'sensors' in cfg:
- sens_cfg = []
- sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
- for node in new_nodes:
- sens_cfg.append((node.connection, collect_cfg))
-
- uri = cfg["sensors"]["receiver_uri"]
- logger.debug("Installing sensors on vm's")
- deploy_and_start_sensors(uri, None,
- connected_config=sens_cfg)
+ deploy_sensors_stage(cfg_dict,
+ ctx,
+ nodes=new_nodes,
+ undeploy=False)
for test_group in config.get('tests', []):
ctx.results.extend(run_tests(test_group, ctx.nodes))
-
- finally:
- if not ctx.keep_vm:
- shut_down_vms_stage(cfg, ctx)
-
else:
ctx.results.extend(run_tests(group, ctx.nodes))
@@ -426,22 +334,6 @@
node.connection.close()
-def yamable(data):
- if isinstance(data, (tuple, list)):
- return map(yamable, data)
-
- if isinstance(data, unicode):
- return str(data)
-
- if isinstance(data, dict):
- res = {}
- for k, v in data.items():
- res[yamable(k)] = yamable(v)
- return res
-
- return data
-
-
def store_raw_results_stage(cfg, ctx):
raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
@@ -451,7 +343,7 @@
else:
cont = []
- cont.extend(yamable(ctx.results))
+ cont.extend(utils.yamable(ctx.results))
raw_data = pretty_yaml.dumps(cont)
with open(raw_results, "w") as fd:
@@ -564,7 +456,7 @@
ctx.build_meta['build_descrption'] = opts.build_description
ctx.build_meta['build_type'] = opts.build_type
ctx.build_meta['username'] = opts.username
- ctx.keep_vm = opts.keep_vm
+ cfg_dict['keep_vm'] = opts.keep_vm
try:
for stage in stages:
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index f66bb36..6eed90a 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,24 +1,31 @@
import Queue
+import logging
import threading
-from contextlib import contextmanager
from .deploy_sensors import (deploy_and_start_sensors,
stop_and_remove_sensors)
from .protocol import create_protocol, Timeout
-__all__ = ['Empty', 'recv_main', 'start_monitoring',
- 'deploy_and_start_sensors', 'SensorConfig']
+__all__ = ['Empty', 'recv_main',
+ 'deploy_and_start_sensors',
+ 'SensorConfig',
+ 'stop_and_remove_sensors',
+ 'start_listener_thread',
+ ]
Empty = Queue.Empty
+logger = logging.getLogger("wally.sensors")
class SensorConfig(object):
- def __init__(self, conn, url, sensors):
+ def __init__(self, conn, url, sensors, source_id, monitor_url=None):
self.conn = conn
self.url = url
self.sensors = sensors
+ self.source_id = source_id
+ self.monitor_url = monitor_url
def recv_main(proto, data_q, cmd_q):
@@ -38,21 +45,17 @@
pass
-@contextmanager
-def start_monitoring(uri, configs):
- deploy_and_start_sensors(uri, configs)
- try:
- data_q = Queue.Queue()
- cmd_q = Queue.Queue()
- proto = create_protocol(uri, receiver=True)
- th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
- th.daemon = True
- th.start()
+def start_listener_thread(uri):
+ data_q = Queue.Queue()
+ cmd_q = Queue.Queue()
+ logger.debug("Listening for sensor data on " + uri)
+ proto = create_protocol(uri, receiver=True)
+ th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+ th.daemon = True
+ th.start()
- try:
- yield data_q
- finally:
- cmd_q.put(None)
- th.join()
- finally:
- stop_and_remove_sensors(configs)
+ def stop_thread():
+ cmd_q.put(None)
+ th.join()
+
+ return data_q, stop_thread
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
index 4e96afe..1b6993c 100644
--- a/wally/sensors/cp_protocol.py
+++ b/wally/sensors/cp_protocol.py
@@ -8,7 +8,7 @@
import binascii
-logger = logging.getLogger("wally")
+logger = logging.getLogger("wally.sensors")
# protocol contains 2 type of packet:
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 249adfb..73e7902 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -7,14 +7,14 @@
from wally.ssh_utils import copy_paths, run_over_ssh
-logger = logging.getLogger('wally')
+logger = logging.getLogger('wally.sensors')
def wait_all_ok(futures):
return all(future.result() for future in futures)
-def deploy_and_start_sensors(monitor_uri, sensor_configs,
+def deploy_and_start_sensors(sensor_configs,
remote_path='/tmp/sensors/sensors'):
paths = {os.path.dirname(__file__): remote_path}
@@ -25,29 +25,29 @@
futures.append(executor.submit(deploy_and_start_sensor,
paths,
node_sensor_config,
- monitor_uri,
remote_path))
if not wait_all_ok(futures):
raise RuntimeError("Sensor deployment fails on some nodes")
-def deploy_and_start_sensor(paths, node_sensor_config,
- monitor_uri, remote_path):
+def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
try:
copy_paths(node_sensor_config.conn, paths)
sftp = node_sensor_config.conn.open_sftp()
config_remote_path = os.path.join(remote_path, "conf.json")
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
with sftp.open(config_remote_path, "w") as fd:
- fd.write(json.dumps(node_sensor_config.sensors))
+ fd.write(json.dumps(sensors_config))
cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
"sensors.main -d start -u {1} {2}"
cmd = cmd_templ.format(os.path.dirname(remote_path),
- monitor_uri,
+ node_sensor_config.monitor_url,
config_remote_path)
run_over_ssh(node_sensor_config.conn, cmd,
@@ -55,7 +55,7 @@
sftp.close()
except:
- msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+ msg = "During deploing sensors on {0}".format(node_sensor_config.url)
logger.exception(msg)
return False
return True
@@ -69,9 +69,8 @@
# some magic
time.sleep(0.3)
- conn.exec_command("rm -rf {0}".format(remote_path))
-
- logger.debug("Sensors stopped and removed")
+ # logger.warning("Sensors don't removed")
+ run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
@@ -85,3 +84,4 @@
remote_path))
wait(futures)
+ logger.debug("Sensors stopped and removed")
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 3753e7c..e86bbed 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -51,6 +51,11 @@
prev = {}
while True:
+ try:
+ source_id = str(required_sensors.pop('source_id'))
+ except KeyError:
+ source_id = None
+
gtime, data = get_values(required_sensors.items())
curr = {'time': SensorInfo(gtime, True)}
for name, val in data.items():
@@ -60,6 +65,10 @@
prev[name] = val.value
else:
curr[name] = SensorInfo(val.value, False)
+
+ if source_id is not None:
+ curr['source_id'] = source_id
+
sender.send(curr)
time.sleep(opts.timeout)
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c2ace01..7688f31 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -25,7 +25,12 @@
class PickleSerializer(ISensortResultsSerializer):
def pack(self, data):
- ndata = {key: val.value for key, val in data.items()}
+ ndata = {}
+ for key, val in data.items():
+ if isinstance(val, basestring):
+ ndata[key] = val
+ else:
+ ndata[key] = val.value
return pickle.dumps(ndata)
def unpack(self, data):
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 68d4017..0de7816 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -13,11 +13,12 @@
logger = logging.getLogger("wally")
-def ssh_connect(creds, retry_count=6, timeout=10):
+def ssh_connect(creds, retry_count=6, timeout=10, log_warns=True):
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.known_hosts = None
+
for i in range(retry_count):
try:
if creds.user is None:
@@ -55,10 +56,20 @@
# raise ValueError("Wrong credentials {0}".format(creds.__dict__))
except paramiko.PasswordRequiredException:
raise
- except socket.error as err:
- print err
- if i == retry_count - 1:
+ except socket.error:
+ retry_left = retry_count - i - 1
+
+ if log_warns:
+ msg = "Node {0.host}:{0.port} connection timeout."
+
+ if 0 != retry_left:
+ msg += " {0} retry left.".format(retry_left)
+
+ logger.warning(msg.format(creds))
+
+ if 0 == retry_left:
raise
+
time.sleep(1)
@@ -77,8 +88,12 @@
try:
sftp.mkdir(remotepath, mode=mode)
except IOError:
- ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
- intermediate=True)
+ upper_dir = remotepath.rsplit("/", 1)[0]
+
+ if upper_dir == '' or upper_dir == '/':
+ raise
+
+ ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
return sftp.mkdir(remotepath, mode=mode)
else:
sftp.mkdir(remotepath, mode=mode)
@@ -155,12 +170,10 @@
else:
templ = "Can't copy {0!r} - " + \
"it neither a file not a directory"
- msg = templ.format(src)
- raise OSError(msg)
+ raise OSError(templ.format(src))
except Exception as exc:
tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
- msg = tmpl.format(src, dst, exc)
- raise OSError(msg)
+ raise OSError(tmpl.format(src, dst, exc))
finally:
sftp.close()
@@ -234,10 +247,10 @@
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-def connect(uri):
+def connect(uri, **params):
creds = parse_ssh_uri(uri)
creds.port = int(creds.port)
- return ssh_connect(creds)
+ return ssh_connect(creds, **params)
all_sessions_lock = threading.Lock()
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 4fd35ae..b3c141a 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -266,7 +266,7 @@
names.append(name_templ.format(group=group_name, id=i))
futures = []
- logger.debug("Requesting new vms")
+ logger.debug("Requesting new vm's")
for name, flt_ip in zip(names, ips):
params = (nova, name, keypair_name, img, fl,
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 91e9dd5..1e93247 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -157,6 +157,8 @@
run_over_ssh(conn, cmd, timeout=msz, node=self.node)
def run(self, conn, barrier):
+ # logger.warning("No tests runned")
+ # return
cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
# cmd_templ = "env python2 {0} --type {1} {2} --json -"
diff --git a/wally/tests_sensors.py b/wally/tests_sensors.py
new file mode 100644
index 0000000..1e0b1ad
--- /dev/null
+++ b/wally/tests_sensors.py
@@ -0,0 +1,145 @@
+import time
+import Queue
+import logging
+import threading
+
+from wally import utils
+from wally.config import cfg_dict
+from wally.sensors.api import (start_listener_thread,
+ deploy_and_start_sensors,
+ SensorConfig,
+ stop_and_remove_sensors)
+
+
+logger = logging.getLogger("wally")
+
+
+def save_sensors_data(data_q, mon_q, fd):
+ fd.write("\n")
+
+ observed_nodes = set()
+
+ try:
+ while True:
+ val = data_q.get()
+ if val is None:
+ break
+
+ addr, data = val
+ if addr not in observed_nodes:
+ mon_q.put(addr + (data['source_id'],))
+ observed_nodes.add(addr)
+
+ fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
+ except Exception:
+ logger.exception("Error in sensors thread")
+ logger.info("Sensors thread exits")
+
+
+def get_sensors_config_for_nodes(cfg, nodes):
+ monitored_nodes = []
+ sensors_configs = []
+
+ receiver_url = cfg["receiver_url"]
+ assert '{ip}' in receiver_url
+
+ for role, sensors_str in cfg["roles_mapping"].items():
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+ for node in nodes:
+ if role in node.roles:
+
+ if node.monitor_url is not None:
+ monitor_url = node.monitor_url
+ else:
+ ext_ip = utils.get_ip_for_target(node.get_ip())
+ monitor_url = receiver_url.format(ip=ext_ip)
+
+ monitored_nodes.append(node)
+ sens_cfg = SensorConfig(node.connection,
+ node.get_conn_id(),
+ collect_cfg,
+ source_id=node.get_conn_id(),
+ monitor_url=monitor_url)
+ sensors_configs.append(sens_cfg)
+
+ return monitored_nodes, sensors_configs
+
+
+def start_sensor_process_thread(ctx, cfg, sensors_configs):
+ receiver_url = cfg["receiver_url"]
+ sensors_data_q, stop_sensors_loop = \
+ start_listener_thread(receiver_url.format(ip='0.0.0.0'))
+
+ mon_q = Queue.Queue()
+ fd = open(cfg_dict['sensor_storage'], "w")
+ logger.info("Start sensors data receiving thread")
+ sensor_listen_th = threading.Thread(None, save_sensors_data, None,
+ (sensors_data_q, mon_q, fd))
+ sensor_listen_th.daemon = True
+ sensor_listen_th.start()
+
+ def stop_sensors_receiver(cfg, ctx):
+ stop_sensors_loop()
+ sensors_data_q.put(None)
+ sensor_listen_th.join()
+
+ ctx.clear_calls_stack.append(stop_sensors_receiver)
+ return mon_q
+
+
+def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True):
+ if 'sensors' not in cfg:
+ return
+
+ cfg = cfg.get('sensors')
+
+ if nodes is None:
+ nodes = ctx.nodes
+
+ logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
+ monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
+ nodes)
+
+ if len(monitored_nodes) == 0:
+ logger.info("Nothing to monitor, no sensors would be installed")
+ return
+
+ if ctx.sensors_mon_q is None:
+ ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
+ sensors_configs)
+
+ if undeploy:
+ def remove_sensors_stage(cfg, ctx):
+ stop_and_remove_sensors(sensors_configs)
+ ctx.clear_calls_stack.append(remove_sensors_stage)
+
+ deploy_and_start_sensors(sensors_configs)
+ wait_for_new_sensors_data(ctx, monitored_nodes)
+
+
+def wait_for_new_sensors_data(ctx, monitored_nodes):
+ MAX_WAIT_FOR_SENSORS = 10
+ etime = time.time() + MAX_WAIT_FOR_SENSORS
+
+ msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
+ nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
+ logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ids)))
+
+ # wait till all nodes start sending data
+ while len(nodes_ids) != 0:
+ tleft = etime - time.time()
+ try:
+ source_id = ctx.sensors_mon_q.get(True, tleft)[2]
+ except Queue.Empty:
+ msg = "Node {0} not sending any sensor data in {1}s"
+ msg = msg.format(", ".join(nodes_ids), MAX_WAIT_FOR_SENSORS)
+ raise RuntimeError(msg)
+
+ if source_id not in nodes_ids:
+ msg = "Receive sensors from extra node: {0}".format(source_id)
+ logger.warning(msg)
+
+ nodes_ids.remove(source_id)
diff --git a/wally/utils.py b/wally/utils.py
index 60645d4..71fbe57 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -122,3 +122,19 @@
m = (seconds % 3600) // 60
s = seconds % 60
return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+
+
+def yamable(data):
+ if isinstance(data, (tuple, list)):
+ return map(yamable, data)
+
+ if isinstance(data, unicode):
+ return str(data)
+
+ if isinstance(data, dict):
+ res = {}
+ for k, v in data.items():
+ res[yamable(k)] = yamable(v)
+ return res
+
+ return data