a lot of fixes
diff --git a/wally/report.py b/wally/report.py
index aa7a4b0..260c031 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -4,7 +4,7 @@
import wally
from wally import charts
-from wally.utils import parse_creds
+from wally.utils import parse_creds, ssize_to_b
from wally.suits.io.results_loader import process_disk_info
from wally.meta_info import total_lab_info, collect_lab_data
@@ -12,10 +12,19 @@
logger = logging.getLogger("wally.report")
-def render_html(dest, info, lab_description):
+def render_hdd_html(dest, info, lab_description):
very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
templ_dir = os.path.join(very_root_dir, 'report_templates')
- templ_file = os.path.join(templ_dir, "report.html")
+ templ_file = os.path.join(templ_dir, "report_hdd.html")
+ templ = open(templ_file, 'r').read()
+ report = templ.format(lab_info=lab_description, **info.__dict__)
+ open(dest, 'w').write(report)
+
+
+def render_ceph_html(dest, info, lab_description):
+ very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+ templ_dir = os.path.join(very_root_dir, 'report_templates')
+ templ_file = os.path.join(templ_dir, "report_ceph.html")
templ = open(templ_file, 'r').read()
report = templ.format(lab_info=lab_description, **info.__dict__)
open(dest, 'w').write(report)
@@ -43,31 +52,58 @@
lines=[
(latv, "msec", "rr", "lat"),
(iops_or_bw_per_vm, None, None,
- "IOPS per thread")
+ legend[0] + " per thread")
])
return str(ch)
-def make_plots(processed_results, path):
- name_filters = [
- ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k sync IOPS'),
+def make_hdd_plots(processed_results, path):
+ plots = [
+ ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
]
+ make_plots(processed_results, path, plots)
- for name_pref, fname, desc in name_filters:
+
+def make_ceph_plots(processed_results, path):
+ plots = [
+ ('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+ ('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
+ ('ceph_test_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+ ('ceph_test_swd1m', 'seq_write_1m',
+ 'Sequential write 1m direct MiBps'),
+ ]
+ make_plots(processed_results, path, plots)
+
+
+def make_plots(processed_results, path, plots):
+ for name_pref, fname, desc in plots:
chart_data = []
+
for res in processed_results.values():
if res.name.startswith(name_pref):
chart_data.append(res)
+ if len(chart_data) == 0:
+ raise ValueError("Can't found any date for " + name_pref)
+
+ use_bw = ssize_to_b(chart_data[0].raw['blocksize']) > 16 * 1024
+
chart_data.sort(key=lambda x: x.raw['concurence'])
lat = [x.lat for x in chart_data]
concurence = [x.raw['concurence'] for x in chart_data]
- iops = [x.iops for x in chart_data]
- iops_dev = [x.iops * x.dev for x in chart_data]
- io_chart(desc, concurence, lat, iops, iops_dev, 'bw', fname)
+ if use_bw:
+ data = [x.bw for x in chart_data]
+ data_dev = [x.bw * x.dev for x in chart_data]
+ name = "BW"
+ else:
+ data = [x.iops for x in chart_data]
+ data_dev = [x.iops * x.dev for x in chart_data]
+ name = "IOPS"
+
+ io_chart(desc, concurence, lat, data, data_dev, name, fname)
class DiskInfo(object):
@@ -104,6 +140,11 @@
di.bw_write_max = max(di.bw_write_max, res.bw)
elif res.raw['rw'] == 'read':
di.bw_read_max = max(di.bw_read_max, res.bw)
+ elif res.raw['sync_mode'] == 'd' and res.raw['blocksize'] == '16m':
+ if res.raw['rw'] == 'write' or res.raw['rw'] == 'randwrite':
+ di.bw_write_max = max(di.bw_write_max, res.bw)
+ elif res.raw['rw'] == 'read' or res.raw['rw'] == 'randread':
+ di.bw_read_max = max(di.bw_read_max, res.bw)
di.bw_write_max /= 1000
di.bw_read_max /= 1000
@@ -161,7 +202,14 @@
def make_hdd_report(processed_results, path, lab_info):
make_plots(processed_results, path)
di = get_disk_info(processed_results)
- render_html(path, di, lab_info)
+ render_hdd_html(path, di, lab_info)
+
+
+@report('Ceph', 'ceph_test')
+def make_ceph_report(processed_results, path, lab_info):
+ make_ceph_plots(processed_results, path)
+ di = get_disk_info(processed_results)
+ render_ceph_html(path, di, lab_info)
def make_io_report(results, path, lab_url=None, creds=None):
@@ -183,20 +231,19 @@
try:
processed_results = process_disk_info(results)
res_fields = sorted(processed_results.keys())
-
for fields, name, func in report_funcs:
for field in fields:
pos = bisect.bisect_left(res_fields, field)
if pos == len(res_fields):
- continue
+ break
if not res_fields[pos + 1].startswith(field):
break
else:
hpath = path.format(name)
+ logger.debug("Generatins report " + name + " into " + hpath)
func(processed_results, hpath, lab_info)
- logger.debug(name + " report generated into " + hpath)
break
else:
logger.warning("No report generator found for this load")
diff --git a/wally/run_test.py b/wally/run_test.py
index 9cea103..fbe676f 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -17,11 +17,13 @@
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
+from wally.timeseries import SensorDatastore
from wally.discover import discover, Node, undiscover
from wally import utils, report, ssh_utils, start_vms
from wally.suits.itest import IOPerfTest, PgBenchTest
+from wally.sensors_utils import deploy_sensors_stage
from wally.config import cfg_dict, load_config, setup_loggers
-from wally.sensors_utils import deploy_sensors_stage, SensorDatastore
+
try:
from wally import webui
@@ -197,7 +199,9 @@
# logger.warning("Some test threads still running")
gather_results(res_q, results)
- yield name, test.merge_results(results)
+ result = test.merge_results(results)
+ result['__test_meta__'] = {'testnodes_count': len(test_nodes)}
+ yield name, result
def log_nodes_statistic(_, ctx):
@@ -523,6 +527,8 @@
default=False)
parser.add_argument("-r", '--no-html-report', action='store_true',
help="Skip html report", default=False)
+ parser.add_argument("--params", nargs="*", metavar="testname.paramname",
+ help="Test params", default=[])
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index e5b4347..1e9e898 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,5 +1,4 @@
import time
-import array
import Queue
import logging
import threading
@@ -11,92 +10,14 @@
SensorConfig,
stop_and_remove_sensors)
-
logger = logging.getLogger("wally.sensors")
DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
-class SensorDatastore(object):
- def __init__(self, stime=None):
- self.lock = threading.Lock()
- self.stime = stime
-
- self.min_size = 60 * 60
- self.max_size = 60 * 61
-
- self.data = {
- 'testnodes:io': array.array("B"),
- 'testnodes:cpu': array.array("B"),
- }
-
- def get_values(self, name, start, end):
- assert end >= start
- if end == start:
- return []
-
- with self.lock:
- curr_arr = self.data[name]
- if self.stime is None:
- return []
-
- sidx = start - self.stime
- eidx = end - self.stime
-
- if sidx < 0 and eidx < 0:
- return [0] * (end - start)
- elif sidx < 0:
- return [0] * (-sidx) + curr_arr[:eidx]
- return curr_arr[sidx:eidx]
-
- def set_values(self, start_time, vals):
- with self.lock:
- return self.add_values_l(start_time, vals)
-
- def set_values_l(self, start_time, vals):
- max_cut = 0
- for name, values in vals.items():
- curr_arr = self.data.setdefault(name, array.array("H"))
-
- if self.stime is None:
- self.stime = start_time
-
- curr_end_time = len(curr_arr) + self.stime
-
- if curr_end_time < start_time:
- curr_arr.extend([0] * (start_time - curr_end_time))
- curr_arr.extend(values)
- elif curr_end_time > start_time:
- logger.warning("Duplicated sensors data")
- sindex = len(curr_arr) + (start_time - curr_end_time)
-
- if sindex < 0:
- values = values[-sindex:]
- sindex = 0
- logger.warning("Some data with timestamp before"
- " beginning of measurememts. Skip them")
-
- curr_arr[sindex:sindex + len(values)] = values
- else:
- curr_arr.extend(values)
-
- if len(curr_arr) > self.max_size:
- max_cut = max(len(curr_arr) - self.min_size, max_cut)
-
- if max_cut > 0:
- self.start_time += max_cut
- for val in vals.values():
- del val[:max_cut]
-
-
def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
fd.write("\n")
- BUFFER = 3
observed_nodes = set()
- testnodes_data = {
- 'io': {},
- 'cpu': {},
- }
try:
while True:
@@ -114,23 +35,12 @@
source_id = data.pop('source_id')
rep_time = data.pop('time')
+
if 'testnode' in source2roles_map.get(source_id, []):
- vl = testnodes_data['io'].get(rep_time, 0)
- sum_io_q = vl
- testnodes_data['io'][rep_time] = sum_io_q
-
- etime = time.time() - BUFFER
- for name, vals in testnodes_data.items():
- new_vals = {}
- for rtime, value in vals.items():
- if rtime < etime:
- data_store.set_values("testnodes:io", rtime, [value])
- else:
- new_vals[rtime] = value
-
- vals.clear()
- vals.update(new_vals)
-
+ sum_io_q = 0
+ data_store.update(rep_time,
+ {"testnodes:io": sum_io_q},
+ add=True)
except Exception:
logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 6abaae5..085153f 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -25,7 +25,7 @@
numjobs={NUMJOBS}
# ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
+# direct write
# ---------------------------------------------------------------------
[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=4k
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index a1da1c3..63c9408 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -24,9 +24,11 @@
tab.set_cols_align(["l", "r", "r", "r", "r"])
prev_k = None
+
items = sorted(test_set['res'].items(), key=key_func)
for test_name, data in items:
+
curr_k = key_func((test_name, data))[:3]
if prev_k is not None:
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index c67dbe8..3b50bf7 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -3,7 +3,7 @@
import collections
-from wally.utils import ssize_to_b
+# from wally.utils import ssize_to_b
from wally.statistic import med_dev
PerfInfo = collections.namedtuple('PerfInfo',
@@ -12,16 +12,28 @@
'lat', 'lat_dev', 'raw'))
+def split_and_add(data, block_count):
+ assert len(data) % block_count == 0
+ res = [0] * (len(data) // block_count)
+
+ for i in range(block_count):
+ for idx, val in enumerate(data[i::block_count]):
+ res[idx] += val
+
+ return res
+
+
def process_disk_info(test_output):
data = {}
-
for tp, pre_result in test_output:
if tp != 'io' or pre_result is None:
pass
+ vm_count = pre_result['__test_meta__']['testnodes_count']
+
for name, results in pre_result['res'].items():
- bw, bw_dev = med_dev(results['bw'])
- iops, iops_dev = med_dev(results['iops'])
+ bw, bw_dev = med_dev(split_and_add(results['bw'], vm_count))
+ iops, iops_dev = med_dev(split_and_add(results['iops'], vm_count))
lat, lat_dev = med_dev(results['lat'])
dev = bw_dev / float(bw)
data[name] = PerfInfo(name, bw, iops, dev, lat, lat_dev, results)
@@ -82,19 +94,19 @@
return closure
-def load_data(raw_data):
- data = list(parse_output(raw_data))[0]
+# def load_data(raw_data):
+# data = list(parse_output(raw_data))[0]
- for key, val in data['res'].items():
- val['blocksize_b'] = ssize_to_b(val['blocksize'])
+# for key, val in data['res'].items():
+# val['blocksize_b'] = ssize_to_b(val['blocksize'])
- val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
- val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
- val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
- yield val
+# val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
+# val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
+# val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
+# yield val
-def load_files(*fnames):
- for fname in fnames:
- for i in load_data(open(fname).read()):
- yield i
+# def load_files(*fnames):
+# for fname in fnames:
+# for i in load_data(open(fname).read()):
+# yield i
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 4b9db19..605df2c 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -301,13 +301,16 @@
try:
pid = read_from_remote(sftp, self.pid_file)
is_running = True
- except (NameError, IOError) as exc:
+ except (NameError, IOError, OSError) as exc:
pid = None
is_running = False
if is_running:
if not self.check_process_is_running(sftp, pid):
- sftp.remove(self.pid_file)
+ try:
+ sftp.remove(self.pid_file)
+ except (IOError, NameError, OSError):
+ pass
is_running = False
is_connected = True
@@ -370,6 +373,10 @@
for test in self.fio_configs:
exec_time += io_agent.calculate_execution_time(test)
+ # +5% - is a rough estimation for additional operations
+ # like sftp, etc
+ exec_time = int(exec_time * 1.05)
+
exec_time_s = sec_to_str(exec_time)
now_dt = datetime.datetime.now()
end_dt = now_dt + datetime.timedelta(0, exec_time)
@@ -444,6 +451,7 @@
exec_time_str = sec_to_str(exec_time)
timeout = int(exec_time + max(300, exec_time))
+ soft_tout = exec_time
barrier.wait()
self.run_over_ssh(cmd, nolog=nolog)
@@ -467,7 +475,7 @@
if self.node.connection is not Local:
self.node.connection.close()
- self.wait_till_finished(timeout)
+ self.wait_till_finished(soft_tout, timeout)
if not nolog:
logger.debug("Test on node {0} is finished".format(conn_id))
diff --git a/wally/timeseries.py b/wally/timeseries.py
new file mode 100644
index 0000000..d322fff
--- /dev/null
+++ b/wally/timeseries.py
@@ -0,0 +1,61 @@
+import array
+import threading
+
+
+class SensorDatastore(object):
+ def __init__(self, stime=None):
+ self.lock = threading.Lock()
+ self.stime = stime
+
+ self.min_size = 60 * 60
+ self.max_size = 60 * 61
+
+ self.data = {
+ 'testnodes:io': array.array("B"),
+ 'testnodes:cpu': array.array("B"),
+ }
+
+ def get_values(self, name, start, end):
+ assert end >= start
+
+ if end == start:
+ return []
+
+ with self.lock:
+ curr_arr = self.data[name]
+ if self.stime is None:
+ return []
+
+ sidx = start - self.stime
+ eidx = end - self.stime
+
+ if sidx < 0 and eidx < 0:
+ return [0] * (end - start)
+ elif sidx < 0:
+ return [0] * (-sidx) + curr_arr[:eidx]
+ return curr_arr[sidx:eidx]
+
+ def update_values(self, data_time, vals, add=False):
+ with self.lock:
+ if self.stime is None:
+ self.stime = data_time
+
+ for name, value in vals.items():
+ curr_arr = self.data.setdefault(name, array.array("H"))
+ curr_end_time = len(curr_arr) + self.stime
+
+ dtime = data_time - curr_end_time
+
+ if dtime > 0:
+ curr_arr.extend([0] * dtime)
+ curr_arr.append(value)
+ elif dtime == 0:
+ curr_arr.append(value)
+ else:
+ # dtime < 0
+ sindex = len(curr_arr) + dtime
+ if sindex > 0:
+ if add:
+ curr_arr[sindex] += value
+ else:
+ curr_arr[sindex].append(value)