fix modules and 2.6 compatibility
diff --git a/wally/charts.py b/wally/charts.py
deleted file mode 100644
index b4472a4..0000000
--- a/wally/charts.py
+++ /dev/null
@@ -1,152 +0,0 @@
-import os
-import sys
-import hashlib
-
-from GChartWrapper import Line
-from GChartWrapper import constants
-from GChartWrapper import VerticalBarGroup
-
-from config import cfg_dict
-
-
-# Patch MARKER constant
-constants.MARKERS += 'E'
-sys.modules['GChartWrapper.GChart'].MARKERS += 'E'
-
-
-COLORS = ["1569C7", "81D8D0", "B0BD2E", "5CB3FF", "0040FF", "81DAF5"]
-constants.MARKERS += 'E' # append E marker to available markers
-
-
-def get_top_top_dir(path):
- top_top_dir = os.path.dirname(os.path.dirname(path))
- return path[len(top_top_dir) + 1:]
-
-
-def render_vertical_bar(title, legend, bars_data, bars_dev_top,
- bars_dev_bottom, file_name,
- width=700, height=400,
- scale_x=None, scale_y=None, label_x=None,
- label_y=None, lines=()):
- """
- Renders vertical bar group chart
-
- :param legend - list of legend values.
- Example: ['bar1', 'bar2', 'bar3']
- :param dataset - list of values for each type (value, deviation)
- Example:
- [
- [(10,1), (11, 2), (10,1)], # bar1 values
- [(30,(29,33)),(35,(33,36)), (30,(29,33))], # bar2 values
- [(20,(19,21)),(20,(13, 24)), (20,(19,21))] # bar 3 values
- ]
- :param width - width of chart
- :param height - height of chart
- :param scale_x - x ace scale
- :param scale_y - y ace scale
-
- :returns url to chart
-
- dataset example:
- {
- 'relese_1': {
- 'randr': (1, 0.1),
- 'randwr': (2, 0.2)
- }
- 'release_2': {
- 'randr': (3, 0.3),
- 'randwr': (4, 0.4)
- }
- }
- """
-
- bar = VerticalBarGroup([], encoding='text')
- bar.title(title)
-
- dataset = bars_data + bars_dev_top + bars_dev_bottom + \
- [l[0] for l in lines]
-
- bar.dataset(dataset, series=len(bars_data))
-
- axes_type = ""
-
- if scale_x:
- bar.axes.label(0, *scale_x)
- axes_type += "x"
- if label_x:
- bar.axes.style(len(axes_type), '000000', '13')
- bar.axes.label(len(axes_type), label_x)
- axes_type += "x"
-
- max_value = (max([max(l) for l in dataset[:2]]))
- bar.axes.range(len(axes_type), 0, max_value)
- bar.axes.style(len(axes_type), 'N*s*')
- axes_type += "y"
-
- if label_y:
- bar.axes.style(len(axes_type), '000000', '13')
- bar.axes.label(len(axes_type), None, label_y)
- axes_type += "y"
-
- bar.scale(*[0, max_value] * 3)
-
- bar.bar('r', '.1', '1')
- for i in range(1):
- bar.marker('E', '000000', '%s:%s' % ((len(bars_data) + i*2), i),
- '', '1:10')
- bar.color(*COLORS)
- bar.size(width, height)
-
- scale = [0, max_value] * len(bars_dev_top + bars_dev_bottom + bars_data)
- if lines:
- line_n = 0
- for data, label, axe, leg in lines:
- bar.marker('D', COLORS[len(bars_data) + line_n],
- (len(bars_data + bars_dev_top + bars_dev_bottom))
- + line_n, 0, 3)
- # max_val_l = max(data)
- if axe:
- max_val_l = max(data)
- bar.axes.type(axes_type + axe)
- bar.axes.range(len(axes_type), 0, max_val_l)
- bar.axes.style(len(axes_type), 'N*s*')
- bar.axes.label(len(axes_type) + 1, None, label)
- bar.axes.style(len(axes_type) + 1, '000000', '13')
- axes_type += axe
- line_n += 1
- scale += [0, max_val_l]
- else:
- scale += [0, max_value]
- legend.append(leg)
- # scale += [0, max_val_l]
-
- bar.legend(*legend)
- bar.scale(*scale)
- img_path = file_name + ".png"
-
- if not os.path.exists(img_path):
- bar.save(img_path)
-
- return get_top_top_dir(img_path)
-
-
-def render_lines(title, legend, dataset, scale_x, width=700, height=400):
- line = Line([], encoding="text")
- line.title(title)
- line.dataset(dataset)
-
- line.axes('xy')
- max_value = (max([max(l) for l in dataset]))
- line.axes.range(1, 0, max_value)
- line.scale(0, max_value)
- line.axes.label(0, *scale_x)
- line.legend(*legend)
- line.color(*COLORS[:len(legend)])
- line.size(width, height)
-
- img_name = hashlib.md5(str(line)).hexdigest() + ".png"
- img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
- if not os.path.exists(img_path):
- line.save(img_path)
-
- return get_top_top_dir(img_path)
diff --git a/wally/report.py b/wally/report.py
index 650666b..1b1dbf9 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -447,9 +447,10 @@
tlat = tlatv_ms * 1000
pos = bisect.bisect_left(latv, tlat)
if 0 == pos:
- iops3 = 0
+ setattr(di, 'rws4k_{}ms'.format(tlatv_ms), 0)
elif pos == len(latv):
- iops3 = latv[-1]
+ iops3, _, _ = rws4k_iops_lat_th[-1]
+ setattr(di, 'rws4k_{}ms'.format(tlatv_ms), ">=" + str(iops3))
else:
lat1 = latv[pos - 1]
lat2 = latv[pos]
@@ -462,7 +463,7 @@
th_iops_coef = (iops2 - iops1) / (th2 - th1)
iops3 = th_iops_coef * (th3 - th1) + iops1
- setattr(di, 'rws4k_{}ms'.format(tlatv_ms), int(iops3))
+ setattr(di, 'rws4k_{}ms'.format(tlatv_ms), int(iops3))
hdi = DiskInfo()
diff --git a/wally/run_test.py b/wally/run_test.py
index 5e6b4f9..9a3d04c 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -15,7 +15,12 @@
import yaml
import texttable
-import faulthandler
+
+try:
+ import faulthandler
+except ImportError:
+ faulthandler = None
+
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
@@ -722,6 +727,17 @@
return func.__name__ + " stage"
+def get_test_names(block):
+ assert len(block.items()) == 1
+ name, data = block.items()[0]
+ if name == 'start_test_nodes':
+ for in_blk in data['tests']:
+ for i in get_test_names(in_blk):
+ yield i
+ else:
+ yield name
+
+
def list_results(path):
results = []
@@ -738,15 +754,8 @@
test_names = []
for block in cfg['tests']:
- assert len(block.items()) == 1
- name, data = block.items()[0]
- if name == 'start_test_nodes':
- for in_blk in data['tests']:
- assert len(in_blk.items()) == 1
- in_name, _ = in_blk.items()[0]
- test_names.append(in_name)
- else:
- test_names.append(name)
+ test_names.extend(get_test_names(block))
+
results.append((dname, test_names, res_mtime))
tab = texttable.Texttable(max_width=120)
@@ -768,7 +777,9 @@
list_results(argv[-1])
exit(0)
- faulthandler.register(signal.SIGUSR1, all_threads=True)
+ if faulthandler is not None:
+ faulthandler.register(signal.SIGUSR1, all_threads=True)
+
opts = parse_args(argv)
load_config(opts.config_file, opts.post_process_only)
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index 52d33ed..68b2e7d 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -30,6 +30,7 @@
config_remote_path = os.path.join(remote_path, "conf.json")
def deploy_sensors(node_sensor_config):
+ # check that path already exists
copy_paths(node_sensor_config.conn, paths)
with node_sensor_config.conn.open_sftp() as sftp:
sensors_config = node_sensor_config.sensors.copy()
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 61a5c08..e78bb5f 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -2,6 +2,9 @@
import logging
import contextlib
+from concurrent.futures import ThreadPoolExecutor
+
+from wally import ssh_utils
from wally.sensors.api import (with_sensors, sensors_info, SensorConfig)
@@ -34,6 +37,25 @@
return monitored_nodes, sensors_configs, source2roles_map
+PID_FILE = "/tmp/sensors.pid"
+
+
+def clear_old_sensors(sensors_configs):
+ def stop_sensors(sens_cfg):
+ with sens_cfg.conn.open_sftp() as sftp:
+ try:
+ pid = ssh_utils.read_from_remote(sftp, PID_FILE)
+ pid = int(pid.strip())
+ ssh_utils.run_over_ssh(sens_cfg.conn,
+ "kill -9 " + str(pid))
+ sftp.remove(PID_FILE)
+ except:
+ pass
+
+ with ThreadPoolExecutor(32) as pool:
+ list(pool.map(stop_sensors, sensors_configs))
+
+
@contextlib.contextmanager
def with_sensors_util(cfg, nodes):
if 'sensors' not in cfg:
@@ -58,5 +80,6 @@
get_sensors_config_for_nodes(cfg['sensors'], nodes,
cfg['sensors_remote_path'])
+ clear_old_sensors(sensors_configs)
with sensors_info(sensors_configs, cfg['sensors_remote_path']) as res:
yield res
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 7085be4..7af985d 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -88,17 +88,20 @@
sio.close()
-def ssh_connect(creds, conn_timeout=60):
+def ssh_connect(creds, conn_timeout=60, reuse_conn=None):
if creds == 'local':
return Local()
tcp_timeout = 15
banner_timeout = 30
- ssh = paramiko.SSHClient()
- ssh.load_host_keys('/dev/null')
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh.known_hosts = None
+ if reuse_conn is None:
+ ssh = paramiko.SSHClient()
+ ssh.load_host_keys('/dev/null')
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ ssh.known_hosts = None
+ else:
+ ssh = reuse_conn
etime = time.time() + conn_timeout
@@ -344,6 +347,15 @@
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+def reconnect(conn, uri, **params):
+ if uri == 'local':
+ return conn
+
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ return ssh_connect(creds, reuse_conn=conn, **params)
+
+
def connect(uri, **params):
if uri == 'local':
return Local()
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index afb6ac1..978fa46 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -4,10 +4,12 @@
import logging
import datetime
-from wally.utils import (ssize2b, open_for_append_or_create,
- sec_to_str, StopTestError)
+import paramiko
-from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
+from wally.utils import (ssize2b, sec_to_str, StopTestError)
+
+from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask,
+ reconnect)
from ..itest import IPerfTest, TestResults
from .formatter import format_results_for_console
@@ -30,7 +32,8 @@
'results': self.results,
'raw_result': self.raw_result,
'run_interval': self.run_interval,
- 'vm_count': self.vm_count
+ 'vm_count': self.vm_count,
+ 'test_name': self.test_name
}
@classmethod
@@ -41,7 +44,7 @@
return cls(sec, data['params'], data['results'],
data['raw_result'], data['run_interval'],
- data['vm_count'])
+ data['vm_count'], data['test_name'])
def get_slice_parts_offset(test_slice, real_inteval):
@@ -77,6 +80,9 @@
self.results_file = self.join_remote("results.json")
self.pid_file = self.join_remote("pid")
self.task_file = self.join_remote("task.cfg")
+ self.sh_file = self.join_remote("cmd.sh")
+ self.err_out_file = self.join_remote("fio_err_out")
+ self.exit_code_file = self.join_remote("exit_code")
self.use_sudo = self.options.get("use_sudo", True)
self.test_logging = self.options.get("test_logging", False)
self.raw_cfg = open(self.config_fname).read()
@@ -252,6 +258,9 @@
self.config_params, res,
full_raw_res, interval,
vm_count=self.total_nodes_count)
+ tres.test_name = os.path.basename(self.config_fname)
+ if tres.test_name.endswith('.cfg'):
+ tres.test_name = tres.test_name[:-4]
self.on_result_cb(tres)
except (OSError, StopTestError):
raise
@@ -263,31 +272,38 @@
barrier.exit()
def do_run(self, barrier, cfg_slice, pos, nolog=False):
- # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
+ bash_file = "#!/bin/bash\n" + \
+ "fio --output-format=json --output={out_file} " + \
+ "--alloc-size=262144 {job_file} " + \
+ " >{err_out_file} 2>&1 \n" + \
+ "echo $? >{res_code_file}\n"
+
conn_id = self.node.get_conn_id()
fconn_id = conn_id.replace(":", "_")
- cmd_templ = "fio --output-format=json --output={1} " + \
- "--alloc-size=262144 {0}"
+ # cmd_templ = "fio --output-format=json --output={1} " + \
+ # "--alloc-size=262144 {0}"
- if self.options.get("use_sudo", True):
- cmd_templ = "sudo " + cmd_templ
+ bash_file = bash_file.format(out_file=self.results_file,
+ job_file=self.task_file,
+ err_out_file=self.err_out_file,
+ res_code_file=self.exit_code_file)
task_fc = "\n\n".join(map(str, cfg_slice))
with self.node.connection.open_sftp() as sftp:
save_to_remote(sftp, self.task_file, task_fc)
+ save_to_remote(sftp, self.sh_file, bash_file)
fname = "{0}_{1}.fio".format(pos, fconn_id)
with open(os.path.join(self.log_directory, fname), "w") as fd:
fd.write(task_fc)
- cmd = cmd_templ.format(self.task_file, self.results_file)
-
exec_time = sum(map(execution_time, cfg_slice))
exec_time_str = sec_to_str(exec_time)
timeout = int(exec_time + max(300, exec_time))
soft_tout = exec_time
+
barrier.wait()
if self.is_primary:
@@ -305,8 +321,28 @@
self.run_over_ssh("cd " + os.path.dirname(self.task_file), nolog=True)
task = BGSSHTask(self.node, self.options.get("use_sudo", True))
begin = time.time()
- task.start(cmd)
- task.wait(soft_tout, timeout)
+
+ if self.options.get("use_sudo", True):
+ sudo = "sudo "
+ else:
+ sudo = ""
+
+ task.start(sudo + "bash " + self.sh_file)
+
+ while True:
+ try:
+ task.wait(soft_tout, timeout)
+ break
+ except paramiko.SSHException:
+ pass
+
+ try:
+ self.node.connection.close()
+ except:
+ pass
+
+ reconnect(self.node.connection, self.node.conn_url)
+
end = time.time()
if not nolog:
@@ -326,7 +362,18 @@
with self.node.connection.open_sftp() as sftp:
result = read_from_remote(sftp, self.results_file)
+ exit_code = read_from_remote(sftp, self.exit_code_file)
+ err_out = read_from_remote(sftp, self.err_out_file)
+ exit_code = exit_code.strip()
+
+ if exit_code != '0':
+ msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
+ logger.critical(msg.strip())
+ raise StopTestError("fio failed")
+
sftp.remove(self.results_file)
+ sftp.remove(self.err_out_file)
+ sftp.remove(self.exit_code_file)
fname = "{0}_{1}.json".format(pos, fconn_id)
with open(os.path.join(self.log_directory, fname), "w") as fd:
diff --git a/wally/suits/io/check_distribution.cfg b/wally/suits/io/check_distribution.cfg
index 4746f37..9475cde 100644
--- a/wally/suits/io/check_distribution.cfg
+++ b/wally/suits/io/check_distribution.cfg
@@ -10,4 +10,4 @@
ramp_time=5
runtime=30
-size=10G
+size=200G
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 4b242a8..0eb85a6 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -1,17 +1,15 @@
[global]
include defaults.cfg
-NUM_ROUNDS=3
-
# NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
-NUMJOBS={% 1, 5, 10 %}
+NUMJOBS={% 1, 3, 5, 10, 20, 40 %}
write_lat_log=fio_log
write_iops_log=fio_log
log_avg_msec=500
-size=10G
+size={TEST_FILE_SIZE}
ramp_time=5
runtime=30
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 09e93f0..4be7eed 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -7,13 +7,14 @@
class TestResults(object):
def __init__(self, config, params, results,
- raw_result, run_interval, vm_count):
+ raw_result, run_interval, vm_count, test_name=None):
self.config = config
self.params = params
self.results = results
self.raw_result = raw_result
self.run_interval = run_interval
self.vm_count = vm_count
+ self.test_name = test_name
def __str__(self):
res = "{0}({1}):\n results:\n".format(
diff --git a/wally/utils.py b/wally/utils.py
index d94d333..f5ab8b6 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,14 +1,17 @@
import re
import os
import sys
-import time
-import psutil
import socket
import logging
import threading
import contextlib
import subprocess
+try:
+ import psutil
+except ImportError:
+ psutil = None
+
logger = logging.getLogger("wally")
@@ -191,11 +194,14 @@
thread.join(timeout)
if thread.is_alive():
+ if psutil is not None:
+ parent = psutil.Process(proc.pid)
+ for child in parent.children(recursive=True):
+ child.kill()
+ parent.kill()
+ else:
+ proc.kill()
- parent = psutil.Process(proc.pid)
- for child in parent.children(recursive=True):
- child.kill()
- parent.kill()
thread.join()
raise RuntimeError("Local process timeout: " + str(cmd))