start adding unit tests, rework config compiler
diff --git a/TODO b/TODO
index 7c51227..4bf20cc 100644
--- a/TODO
+++ b/TODO
@@ -8,6 +8,7 @@
Make python module
putget/ssbench tests (костя)
тестирование (костя)
+отдельный тенант на все
Intellectual granular sensors
diff --git a/configs/usb_hdd.yaml b/configs/usb_hdd.yaml
index 224c6f6..c0d1086 100644
--- a/configs/usb_hdd.yaml
+++ b/configs/usb_hdd.yaml
@@ -1,6 +1,5 @@
explicit_nodes:
- "ssh://koder@192.168.0.108::/home/koder/.ssh/id_rsa": testnode
- # local: testnode
+ local: testnode
internal:
var_dir_root: /tmp/perf_tests
@@ -13,10 +12,12 @@
testnode: system-cpu, block-io
tests:
- - io:
- cfg: wally/suits/io/io_scenario_hdd.cfg
- params:
- FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
- NUM_ROUNDS: 7
+ - io:
+ cfg: wally/suits/io/io_scenario_hdd.cfg
+ prefill_files: false
+ params:
+ FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+ NUM_ROUNDS: 3
+
logging:
extra_logs: 1
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 825a5bc..6c0f07e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
+oktest
priest
GChartWrapper==0.8
decorator==3.4.0
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/test_agent.py b/tests/test_agent.py
new file mode 100644
index 0000000..f9fa389
--- /dev/null
+++ b/tests/test_agent.py
@@ -0,0 +1,424 @@
+import os.path
+import unittest
+
+
+from oktest import ok, main, test
+
+
+from wally.suits.io import agent
+
+code_test_defaults = """
+[defaults]
+wait_for_previous
+buffered=0
+iodepth=2
+RUNTIME=20
+
+[sec1]
+group_reporting
+time_based
+softrandommap=1
+filename=/tmp/xxx
+size=5G
+ramp_time=20
+runtime={RUNTIME}
+blocksize=1m
+rw=read
+direct=1
+numjobs=1
+some_extra=1.2314
+
+[sec2]
+group_reporting
+time_based
+iodepth=1
+softrandommap=1
+filename=/tmp/xxx
+size=5G
+ramp_time=20
+runtime={RUNTIME}
+blocksize=1m
+rw=read
+direct=1
+numjobs=1
+some_extra=1.2314
+"""
+
+defaults = """
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename=/tmp/xxx
+size=5G
+ramp_time=20
+runtime=20
+blocksize=1m
+rw=read
+direct=1
+numjobs=1
+"""
+
+code_test_auto_params_1 = defaults + """
+[defaults]
+RUNTIME=30
+
+[sec1_{TEST_SUMM}]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+
+code_test_uniq = defaults + """
+[defaults]
+REPCOUNT=2
+RUNTIME=30
+
+[sec1_{TEST_SUMM}_{UNIQ} * 3]
+
+[sec2_{TEST_SUMM}_{UNIQ} * {REPCOUNT}]
+"""
+
+code_test_cycles_default = defaults + """
+[defaults]
+REPCOUNT=2
+RUNTIME={% 30, 60 %}
+
+[sec1_{TEST_SUMM}_{UNIQ} * 3]
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+
+P = agent.parse_all_in_1
+
+
+class AgentTest(unittest.TestCase):
+ @test("test_parse_value")
+ def test_parse_value(self):
+ x = "asdfasd adsd d"
+ ok(agent.parse_value(x)) == x
+ ok(agent.parse_value("10 2")) == "10 2"
+ ok(agent.parse_value(None)).is_(None)
+ ok(agent.parse_value("10")) == 10
+ ok(agent.parse_value("20")) == 20
+ ok(agent.parse_value("10.1") - 10.1) < 1E-7
+ ok(agent.parse_value("{% 10, 20 %}")) == [10, 20]
+ ok(agent.parse_value("{% 10,20 %}")) == [10, 20]
+
+ code_test_compile_simplest = defaults + """
+[sec1]
+some_extra=1.2314
+"""
+
+ @test("test_compile_simplest")
+ def test_compile_simplest(self):
+ sections = P(self.code_test_compile_simplest, {})
+ sections = list(sections)
+
+ ok(len(sections)) == 1
+ sec1 = sections[0]
+ ok(sec1.name) == "sec1"
+ vals = sec1.vals
+ ok(vals['wait_for_previous']).is_(None)
+ ok(vals['iodepth']) == 1
+ ok(vals['some_extra'] - 1.2314) < 1E-7
+
+ code_test_params_in_defaults = defaults + """
+[defaults]
+RUNTIME=20
+
+[sec1]
+runtime={RUNTIME}
+"""
+
+ @test("test_compile_defaults")
+ def test_compile_defaults(self):
+ sections = P(self.code_test_params_in_defaults, {})
+ sections = list(sections)
+
+ ok(len(sections)) == 1
+ sec1 = sections[0]
+ ok(sec1.name) == "sec1"
+ vals = sec1.vals
+ ok(vals['wait_for_previous']).is_(None)
+ ok(vals['iodepth']) == 1
+ ok(vals['runtime']) == 20
+
+ @test("test_defaults")
+ def test_defaults(self):
+ sections = P(code_test_defaults, {})
+ sections = list(sections)
+
+ ok(len(sections)) == 2
+ sec1, sec2 = sections
+
+ ok(sec1.name) == "sec1"
+ ok(sec2.name) == "sec2"
+
+ ok(sec1.vals['wait_for_previous']).is_(None)
+ ok(sec2.vals['wait_for_previous']).is_(None)
+
+ ok(sec1.vals['iodepth']) == 2
+ ok(sec2.vals['iodepth']) == 1
+
+ ok(sec1.vals['buffered']) == 0
+ ok(sec2.vals['buffered']) == 0
+
+ code_test_ext_params = defaults + """
+[sec1]
+runtime={RUNTIME}
+"""
+
+ @test("test_external_params")
+ def test_external_params(self):
+ with self.assertRaises(KeyError):
+ sections = P(self.code_test_ext_params, {})
+ list(sections)
+
+ sections = P(self.code_test_ext_params,
+ {'RUNTIME': 20})
+ sections = list(sections)
+
+ code_test_cycle = defaults + """
+[sec1]
+runtime={RUNTIME}
+ramp_time={% 20, 40 %}
+"""
+
+ @test("test_cycle")
+ def test_cycle(self):
+ sections = P(self.code_test_cycle,
+ {'RUNTIME': 20})
+ sections = list(sections)
+ ok(len(sections)) == 2
+ ok(sections[0].vals['ramp_time']) == 20
+ ok(sections[1].vals['ramp_time']) == 40
+
+ code_test_cycles = defaults + """
+[sec1]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+ @test("test_cycles")
+ def test_cycles(self):
+ sections = P(self.code_test_cycles,
+ {'RUNTIME': 20})
+ sections = list(sections)
+ ok(len(sections)) == 4
+
+ combinations = [
+ (section.vals['ramp_time'], section.vals['blocksize'])
+ for section in sections
+ ]
+
+ combinations.sort()
+
+ ok(combinations) == [(20, '4k'), (20, '4m'), (40, '4k'), (40, '4m')]
+
+ @test("test_time_estimate")
+ def test_time_estimate(self):
+ sections = P(self.code_test_cycles,
+ {'RUNTIME': 20})
+ sections = list(sections)
+ etime = agent.calculate_execution_time(sections)
+
+ ok(etime) == 20 * 4 + 20 * 2 + 40 * 2
+ ok(agent.sec_to_str(etime)) == "0:03:20"
+
+ code_test_cycles2 = defaults + """
+[sec1 * 7]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+ @test("test_time_estimate")
+ def test_time_estimate_large(self):
+ sections = P(self.code_test_cycles2,
+ {'RUNTIME': 30})
+ sections = list(sections)
+
+ ok(sections[0].name) == 'sec1'
+ ok(len(sections)) == 7 * 4
+
+ etime = agent.calculate_execution_time(sections)
+ # ramptime optimization
+ expected_time = (20 + 30 + 30 * 6) * 2
+ expected_time += (40 + 30 + 30 * 6) * 2
+ ok(etime) == expected_time
+
+ code_test_cycles3 = defaults + """
+[sec1 * 7]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+
+[sec2 * 7]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+ @test("test_time_estimate2")
+ def test_time_estimate_large2(self):
+ sections = P(self.code_test_cycles3, {'RUNTIME': 30})
+ sections = list(sections)
+
+ ok(sections[0].name) == 'sec1'
+ ok(sections[1].name) == 'sec1'
+ ok(len(sections)) == 7 * 4 * 2
+
+ etime = agent.calculate_execution_time(sections)
+ # ramptime optimization
+ expected_time = (20 + 30 + 30 * 6) * 2
+ expected_time += (40 + 30 + 30 * 6) * 2
+ ok(etime) == expected_time * 2
+
+ code_test_repeats = defaults + """
+[defaults]
+REPCOUNT=2
+[sec1 * 3]
+[sec2 * {REPCOUNT}]
+"""
+
+ @test("test_repeat")
+ def test_repeat(self):
+ sections = P(self.code_test_repeats, {})
+ sections = list(sections)
+ ok(len(sections)) == 2 + 3
+ ok(sections[0].name) == 'sec1'
+ ok(sections[1].name) == 'sec1'
+ ok(sections[2].name) == 'sec1'
+ ok(sections[3].name) == 'sec2'
+ ok(sections[4].name) == 'sec2'
+
+ @test("test_real_tasks")
+ def test_real_tasks(self):
+ tasks_dir = os.path.dirname(agent.__file__)
+ fname = os.path.join(tasks_dir, 'io_scenario_ceph.cfg')
+ fc = open(fname).read()
+
+ sections = P(fc, {'FILENAME': '/dev/null'})
+ sections = list(sections)
+
+ ok(len(sections)) == 7 * 9 * 4 + 7
+
+ etime = agent.calculate_execution_time(sections)
+ # ramptime optimization
+ expected_time = (60 * 7 + 30) * 9 * 4 + (60 * 7 + 30)
+ ok(etime) == expected_time
+
+if __name__ == '__main__':
+ main()
+
+# def do_run_fio_fake(bconf):
+# def estimate_iops(sz, bw, lat):
+# return 1 / (lat + float(sz) / bw)
+# global count
+# count += 1
+# parsed_out = []
+
+# BW = 120.0 * (1024 ** 2)
+# LAT = 0.003
+
+# for name, cfg in bconf:
+# sz = to_bytes(cfg['blocksize'])
+# curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
+# curr_ulat = curr_lat * 1000000
+# curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
+# iops = estimate_iops(sz, curr_bw, curr_lat)
+# bw = iops * sz
+
+# res = {'ctx': 10683,
+# 'error': 0,
+# 'groupid': 0,
+# 'jobname': name,
+# 'majf': 0,
+# 'minf': 30,
+# 'read': {'bw': 0,
+# 'bw_agg': 0.0,
+# 'bw_dev': 0.0,
+# 'bw_max': 0,
+# 'bw_mean': 0.0,
+# 'bw_min': 0,
+# 'clat': {'max': 0,
+# 'mean': 0.0,
+# 'min': 0,
+# 'stddev': 0.0},
+# 'io_bytes': 0,
+# 'iops': 0,
+# 'lat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0},
+# 'runtime': 0,
+# 'slat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0}
+# },
+# 'sys_cpu': 0.64,
+# 'trim': {'bw': 0,
+# 'bw_agg': 0.0,
+# 'bw_dev': 0.0,
+# 'bw_max': 0,
+# 'bw_mean': 0.0,
+# 'bw_min': 0,
+# 'clat': {'max': 0,
+# 'mean': 0.0,
+# 'min': 0,
+# 'stddev': 0.0},
+# 'io_bytes': 0,
+# 'iops': 0,
+# 'lat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0},
+# 'runtime': 0,
+# 'slat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0}
+# },
+# 'usr_cpu': 0.23,
+# 'write': {'bw': 0,
+# 'bw_agg': 0,
+# 'bw_dev': 0,
+# 'bw_max': 0,
+# 'bw_mean': 0,
+# 'bw_min': 0,
+# 'clat': {'max': 0, 'mean': 0,
+# 'min': 0, 'stddev': 0},
+# 'io_bytes': 0,
+# 'iops': 0,
+# 'lat': {'max': 0, 'mean': 0,
+# 'min': 0, 'stddev': 0},
+# 'runtime': 0,
+# 'slat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0}
+# }
+# }
+
+# if cfg['rw'] in ('read', 'randread'):
+# key = 'read'
+# elif cfg['rw'] in ('write', 'randwrite'):
+# key = 'write'
+# else:
+# raise ValueError("Uknown op type {0}".format(key))
+
+# res[key]['bw'] = bw
+# res[key]['iops'] = iops
+# res[key]['runtime'] = 30
+# res[key]['io_bytes'] = res[key]['runtime'] * bw
+# res[key]['bw_agg'] = bw
+# res[key]['bw_dev'] = bw / 30
+# res[key]['bw_max'] = bw * 1.5
+# res[key]['bw_min'] = bw / 1.5
+# res[key]['bw_mean'] = bw
+# res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
+# 'min': curr_ulat / 2, 'stddev': curr_ulat}
+# res[key]['lat'] = res[key]['clat'].copy()
+# res[key]['slat'] = res[key]['clat'].copy()
+
+# parsed_out.append(res)
+
+# return zip(parsed_out, bconf)
diff --git a/wally/discover/node.py b/wally/discover/node.py
index dc1c9b0..8659fe6 100644
--- a/wally/discover/node.py
+++ b/wally/discover/node.py
@@ -10,9 +10,14 @@
self.monitor_url = None
def get_ip(self):
+ if self.conn_url == 'local':
+ return '127.0.0.1'
return urlparse.urlparse(self.conn_url).hostname
def get_conn_id(self):
+ if self.conn_url == 'local':
+ return '127.0.0.1'
+
host = urlparse.urlparse(self.conn_url).hostname
port = urlparse.urlparse(self.conn_url).port
diff --git a/wally/run_test.py b/wally/run_test.py
index b22ce38..44b65c0 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -44,6 +44,10 @@
def connect_one(node, vm=False):
+ if node.conn_url == 'local':
+ node.connection = ssh_utils.connect(node.conn_url)
+ return
+
try:
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 3cd6bc8..e44704d 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -54,7 +54,11 @@
if node.monitor_url is not None:
monitor_url = node.monitor_url
else:
- ext_ip = utils.get_ip_for_target(node.get_ip())
+ ip = node.get_ip()
+ if ip == '127.0.0.1':
+ ext_ip = '127.0.0.1'
+ else:
+ ext_ip = utils.get_ip_for_target(ip)
monitor_url = receiver_url.format(ip=ext_ip)
monitored_nodes.append(node)
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index f1818ad..d928a82 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,11 +1,12 @@
import re
import time
import socket
+import shutil
import logging
import os.path
import getpass
import threading
-
+import subprocess
import paramiko
@@ -13,7 +14,47 @@
logger = logging.getLogger("wally")
+class Local(object):
+ "placeholder for local node"
+ @classmethod
+ def open_sftp(cls):
+ return cls
+
+ @classmethod
+ def mkdir(cls, remotepath, mode=None):
+ os.mkdir(remotepath)
+ if mode is not None:
+ os.chmod(remotepath, mode)
+
+ @classmethod
+ def put(cls, localfile, remfile):
+ shutil.copyfile(localfile, remfile)
+
+ @classmethod
+ def chmod(cls, path, mode):
+ os.chmod(path, mode)
+
+ @classmethod
+ def copytree(cls, src, dst):
+ shutil.copytree(src, dst)
+
+ @classmethod
+ def remove(cls, path):
+ os.unlink(path)
+
+ @classmethod
+ def close(cls):
+ pass
+
+ @classmethod
+ def open(cls, *args, **kwarhgs):
+ return open(*args, **kwarhgs)
+
+
def ssh_connect(creds, retry_count=6, timeout=10, log_warns=True):
+ if creds == 'local':
+ return Local
+
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
@@ -248,6 +289,9 @@
def connect(uri, **params):
+ if uri == 'local':
+ return Local
+
creds = parse_ssh_uri(uri)
creds.port = int(creds.port)
return ssh_connect(creds, **params)
@@ -260,6 +304,23 @@
def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
nolog=False, node=None):
"should be replaces by normal implementation, with select"
+
+ if conn is Local:
+ if not nolog:
+ logger.debug("SSH:local Exec {0!r}".format(cmd))
+ proc = subprocess.Popen(cmd, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ stdoutdata, _ = proc.communicate(input=stdin_data)
+
+ if proc.returncode != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
+
+ return stdoutdata
+
transport = conn.get_transport()
session = transport.open_session()
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index d15d18e..1982e35 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -1,7 +1,7 @@
import sys
import time
import json
-import random
+import copy
import select
import pprint
import argparse
@@ -15,6 +15,234 @@
SETTING = 1
+class FioJobSection(object):
+ def __init__(self, name):
+ self.name = name
+ self.vals = OrderedDict()
+ self.format_params = {}
+
+ def copy(self):
+ return copy.deepcopy(self)
+
+
+def to_bytes(sz):
+ sz = sz.lower()
+ try:
+ return int(sz)
+ except ValueError:
+ if sz[-1] == 'm':
+ return (1024 ** 2) * int(sz[:-1])
+ if sz[-1] == 'k':
+ return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
+ raise
+
+
+def fio_config_lexer(fio_cfg):
+ for lineno, line in enumerate(fio_cfg.split("\n")):
+ try:
+ line = line.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if line.startswith('['):
+ assert line.endswith(']'), "name should ends with ]"
+ yield lineno, SECTION, line[1:-1], None
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield lineno, SETTING, opt_name.strip(), opt_val.strip()
+ else:
+ yield lineno, SETTING, line, None
+ except Exception as exc:
+ pref = "During parsing line number {0}\n".format(lineno)
+ raise ValueError(pref + exc.message)
+
+
+def fio_config_parse(lexer_iter, format_params):
+ orig_format_params_keys = set(format_params)
+ format_params = format_params.copy()
+ in_defaults = False
+ curr_section = None
+ defaults = OrderedDict()
+
+ for lineno, tp, name, val in lexer_iter:
+ if tp == SECTION:
+ if curr_section is not None:
+ yield curr_section
+
+ if name == 'defaults':
+ in_defaults = True
+ curr_section = None
+ else:
+ in_defaults = False
+ curr_section = FioJobSection(name)
+ curr_section.format_params = format_params.copy()
+ curr_section.vals = defaults.copy()
+ else:
+ assert tp == SETTING
+ if name == name.upper():
+ msg = "Param not in default section in line " + str(lineno)
+ assert in_defaults, msg
+ if name not in orig_format_params_keys:
+ # don't make parse_value for PARAMS
+ # they would be parsed later
+ # or this would breakes arrays
+ format_params[name] = val
+ elif in_defaults:
+ defaults[name] = parse_value(val)
+ else:
+ msg = "data outside section, line " + str(lineno)
+ assert curr_section is not None, msg
+ curr_section.vals[name] = parse_value(val)
+
+ if curr_section is not None:
+ yield curr_section
+
+
+def parse_value(val):
+ if val is None:
+ return None
+
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ try:
+ return float(val)
+ except ValueError:
+ pass
+
+ if val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2]
+ vals = list(i.strip() for i in content.split(','))
+ return map(parse_value, vals)
+ return val
+
+
+def process_repeats(sec_iter):
+
+ for sec in sec_iter:
+ if '*' in sec.name:
+ msg = "Only one '*' allowed in section name"
+ assert sec.name.count('*') == 1, msg
+
+ name, count = sec.name.split("*")
+ sec.name = name.strip()
+ count = count.strip()
+
+ try:
+ count = int(count.strip().format(**sec.format_params))
+ except KeyError:
+ raise ValueError("No parameter {0} given".format(count[1:-1]))
+ except ValueError:
+ msg = "Parameter {0} nas non-int value {1!r}"
+ raise ValueError(msg.format(count[1:-1],
+ count.format(**sec.format_params)))
+
+ yield sec
+
+ if 'ramp_time' in sec.vals:
+ sec = sec.copy()
+ sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+ for _ in range(count - 1):
+ yield sec.copy()
+ else:
+ yield sec
+
+
+def process_cycles(sec_iter):
+ # insert parametrized cycles
+ sec_iter = try_format_params_into_section(sec_iter)
+
+ for sec in sec_iter:
+
+ cycles_var_names = []
+ cycles_var_values = []
+
+ for name, val in sec.vals.items():
+ if isinstance(val, list):
+ cycles_var_names.append(name)
+ cycles_var_values.append(val)
+
+ if len(cycles_var_names) == 0:
+ yield sec
+ else:
+ for combination in itertools.product(*cycles_var_values):
+ new_sec = sec.copy()
+ new_sec.vals.update(zip(cycles_var_names, combination))
+ yield new_sec
+
+
+def try_format_params_into_section(sec_iter):
+ for sec in sec_iter:
+ params = sec.format_params
+ for name, val in sec.vals.items():
+ if isinstance(val, basestring):
+ try:
+ sec.vals[name] = parse_value(val.format(**params))
+ except:
+ pass
+
+ yield sec
+
+
+def format_params_into_section_finall(sec_iter, counter=[0]):
+ group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
+ for sec in sec_iter:
+
+ num_jobs = int(sec.vals.get('numjobs', '1'))
+ if num_jobs != 1:
+ assert 'group_reporting' in sec.vals, group_report_err_msg
+
+ params = sec.format_params
+
+ fsize = to_bytes(sec.vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ for name, val in sec.vals.items():
+ if isinstance(val, basestring):
+ sec.vals[name] = parse_value(val.format(**params))
+ else:
+ assert isinstance(val, (int, float)) or val is None
+
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(sec.vals)
+ sec.name = sec.name.format(**params)
+
+ yield sec
+
+
+def fio_config_to_str(sec_iter):
+ res = ""
+
+ for pos, sec in enumerate(sec_iter):
+ if pos != 0:
+ res += "\n"
+
+ res += "[{0}]\n".format(sec.name)
+
+ for name, val in sec.vals.items():
+ if name.startswith('_'):
+ continue
+
+ if val is None:
+ res += name + "\n"
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+
def get_test_sync_mode(config):
try:
return config['sync_mode']
@@ -52,343 +280,78 @@
th_count)
-counter = [0]
-
-
-def extract_iterables(vals):
- iterable_names = []
- iterable_values = []
- rest = {}
-
- for val_name, val in vals.items():
- if val is None or not val.startswith('{%'):
- rest[val_name] = val
- else:
- assert val.endswith("%}")
- content = val[2:-2]
- iterable_names.append(val_name)
- iterable_values.append(list(i.strip() for i in content.split(',')))
-
- return iterable_names, iterable_values, rest
-
-
-def format_params_into_section(sec, params, final=True):
- processed_vals = {}
-
- for val_name, val in sec.items():
- if val is None:
- processed_vals[val_name] = val
- else:
- try:
- processed_vals[val_name] = val.format(**params)
- except KeyError:
- if final:
- raise
- processed_vals[val_name] = val
-
- return processed_vals
-
-
-def process_section(name, vals, defaults, format_params):
- vals = vals.copy()
- params = format_params.copy()
-
- if '*' in name:
- name, repeat = name.split('*')
- name = name.strip()
- repeat = int(repeat.format(**params))
- else:
- repeat = 1
-
- # this code can be optimized
- iterable_names, iterable_values, processed_vals = extract_iterables(vals)
-
- group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
- if iterable_values == []:
- processed_vals = format_params_into_section(processed_vals, params,
- final=False)
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(processed_vals)
-
- num_jobs = int(processed_vals.get('numjobs', '1'))
- fsize = to_bytes(processed_vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- processed_vals = format_params_into_section(processed_vals, params,
- final=True)
-
- if num_jobs != 1:
- assert 'group_reporting' in processed_vals, group_report_err_msg
-
- ramp_time = processed_vals.get('ramp_time')
- for i in range(repeat):
- yield name.format(**params), processed_vals.copy()
-
- if 'ramp_time' in processed_vals:
- del processed_vals['ramp_time']
-
- if ramp_time is not None:
- processed_vals['ramp_time'] = ramp_time
- else:
- for it_vals in itertools.product(*iterable_values):
- processed_vals = format_params_into_section(processed_vals, params,
- final=False)
-
- processed_vals.update(dict(zip(iterable_names, it_vals)))
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(processed_vals)
-
- num_jobs = int(processed_vals.get('numjobs', '1'))
- fsize = to_bytes(processed_vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- processed_vals = format_params_into_section(processed_vals, params,
- final=True)
-
- if processed_vals.get('numjobs', '1') != '1':
- assert 'group_reporting' in processed_vals,\
- group_report_err_msg
-
- ramp_time = processed_vals.get('ramp_time')
-
- for i in range(repeat):
- yield name.format(**params), processed_vals.copy()
- if 'ramp_time' in processed_vals:
- processed_vals['_ramp_time'] = ramp_time
- processed_vals.pop('ramp_time')
-
- if ramp_time is not None:
- processed_vals['ramp_time'] = ramp_time
- processed_vals.pop('_ramp_time')
-
-
-def calculate_execution_time(combinations):
+def calculate_execution_time(sec_iter):
time = 0
- for _, params in combinations:
- time += int(params.get('ramp_time', 0))
- time += int(params.get('_ramp_time', 0))
- time += int(params.get('runtime', 0))
+ for sec in sec_iter:
+ time += sec.vals.get('ramp_time', 0)
+ time += sec.vals.get('runtime', 0)
return time
-def parse_fio_config_full(fio_cfg, params=None):
- defaults = {}
- format_params = {}
+def slice_config(sec_iter, runcycle=None, max_jobs=1000):
+ jcount = 0
+ runtime = 0
+ curr_slice = []
- if params is None:
- ext_params = {}
- else:
- ext_params = params.copy()
+ for pos, sec in enumerate(sec_iter):
- curr_section = None
- curr_section_name = None
+ jc = sec.vals.get('numjobs', 1)
+ msg = "numjobs should be integer, not {0!r}".format(jc)
+ assert isinstance(jc, int), msg
- for tp, name, val in parse_fio_config_iter(fio_cfg):
- if tp == SECTION:
- non_def = curr_section_name != 'defaults'
- if curr_section_name is not None and non_def:
- format_params.update(ext_params)
- for sec in process_section(curr_section_name,
- curr_section,
- defaults,
- format_params):
- yield sec
+ curr_task_time = calculate_execution_time([sec])
- if name == 'defaults':
- curr_section = defaults
- else:
- curr_section = OrderedDict()
- curr_section.update(defaults)
- curr_section_name = name
+ if jc > max_jobs:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(sec.name))
+ if runcycle is not None and len(curr_slice) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
else:
- assert tp == SETTING
- assert curr_section_name is not None, "no section name"
- if name == name.upper():
- assert curr_section_name == 'defaults'
- format_params[name] = val
- else:
- curr_section[name] = val
+ rc_ok = True
- if curr_section_name is not None and curr_section_name != 'defaults':
- format_params.update(ext_params)
- for sec in process_section(curr_section_name,
- curr_section,
- defaults,
- format_params):
- yield sec
-
-
-def parse_fio_config_iter(fio_cfg):
- for lineno, line in enumerate(fio_cfg.split("\n")):
- try:
- line = line.strip()
-
- if line.startswith("#") or line.startswith(";"):
- continue
-
- if line == "":
- continue
-
- if line.startswith('['):
- assert line.endswith(']'), "name should ends with ]"
- yield SECTION, line[1:-1], None
- elif '=' in line:
- opt_name, opt_val = line.split('=', 1)
- yield SETTING, opt_name.strip(), opt_val.strip()
- else:
- yield SETTING, line, None
- except Exception as exc:
- pref = "During parsing line number {0}\n".format(lineno)
- raise ValueError(pref + exc.message)
-
-
-def format_fio_config(fio_cfg):
- res = ""
- for pos, (name, section) in enumerate(fio_cfg):
- if name.startswith('_'):
+ if jc + jcount <= max_jobs and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ curr_slice.append(sec)
continue
- if pos != 0:
- res += "\n"
+ assert len(curr_slice) != 0
+ yield curr_slice
- res += "[{0}]\n".format(name)
- for opt_name, opt_val in section.items():
- if opt_val is None:
- res += opt_name + "\n"
- else:
- res += "{0}={1}\n".format(opt_name, opt_val)
- return res
+ if '_ramp_time' in sec.vals:
+ sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+ curr_task_time = calculate_execution_time([sec])
+
+ runtime = curr_task_time
+ jcount = jc
+ curr_slice = [sec]
+
+ if curr_slice != []:
+ yield curr_slice
-count = 0
+def parse_all_in_1(source, test_params):
+ lexer_it = fio_config_lexer(source)
+ sec_it = fio_config_parse(lexer_it, test_params)
+ sec_it = process_cycles(sec_it)
+ sec_it = process_repeats(sec_it)
+ return format_params_into_section_finall(sec_it)
-def to_bytes(sz):
- sz = sz.lower()
- try:
- return int(sz)
- except ValueError:
- if sz[-1] == 'm':
- return (1024 ** 2) * int(sz[:-1])
- if sz[-1] == 'k':
- return 1024 * int(sz[:-1])
- if sz[-1] == 'g':
- return (1024 ** 3) * int(sz[:-1])
- raise
+def parse_and_slice_all_in_1(source, test_params, **slice_params):
+ sec_it = parse_all_in_1(source, test_params)
+ return slice_config(sec_it, **slice_params)
-def do_run_fio_fake(bconf):
- def estimate_iops(sz, bw, lat):
- return 1 / (lat + float(sz) / bw)
- global count
- count += 1
- parsed_out = []
-
- BW = 120.0 * (1024 ** 2)
- LAT = 0.003
-
- for name, cfg in bconf:
- sz = to_bytes(cfg['blocksize'])
- curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
- curr_ulat = curr_lat * 1000000
- curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
- iops = estimate_iops(sz, curr_bw, curr_lat)
- bw = iops * sz
-
- res = {'ctx': 10683,
- 'error': 0,
- 'groupid': 0,
- 'jobname': name,
- 'majf': 0,
- 'minf': 30,
- 'read': {'bw': 0,
- 'bw_agg': 0.0,
- 'bw_dev': 0.0,
- 'bw_max': 0,
- 'bw_mean': 0.0,
- 'bw_min': 0,
- 'clat': {'max': 0,
- 'mean': 0.0,
- 'min': 0,
- 'stddev': 0.0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- },
- 'sys_cpu': 0.64,
- 'trim': {'bw': 0,
- 'bw_agg': 0.0,
- 'bw_dev': 0.0,
- 'bw_max': 0,
- 'bw_mean': 0.0,
- 'bw_min': 0,
- 'clat': {'max': 0,
- 'mean': 0.0,
- 'min': 0,
- 'stddev': 0.0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- },
- 'usr_cpu': 0.23,
- 'write': {'bw': 0,
- 'bw_agg': 0,
- 'bw_dev': 0,
- 'bw_max': 0,
- 'bw_mean': 0,
- 'bw_min': 0,
- 'clat': {'max': 0, 'mean': 0,
- 'min': 0, 'stddev': 0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0,
- 'min': 0, 'stddev': 0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- }
- }
-
- if cfg['rw'] in ('read', 'randread'):
- key = 'read'
- elif cfg['rw'] in ('write', 'randwrite'):
- key = 'write'
- else:
- raise ValueError("Uknown op type {0}".format(key))
-
- res[key]['bw'] = bw
- res[key]['iops'] = iops
- res[key]['runtime'] = 30
- res[key]['io_bytes'] = res[key]['runtime'] * bw
- res[key]['bw_agg'] = bw
- res[key]['bw_dev'] = bw / 30
- res[key]['bw_max'] = bw * 1.5
- res[key]['bw_min'] = bw / 1.5
- res[key]['bw_mean'] = bw
- res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
- 'min': curr_ulat / 2, 'stddev': curr_ulat}
- res[key]['lat'] = res[key]['clat'].copy()
- res[key]['slat'] = res[key]['clat'].copy()
-
- parsed_out.append(res)
-
- return zip(parsed_out, bconf)
+def compile_all_in_1(source, test_params, **slice_params):
+ slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
+ for slices in slices_it:
+ yield fio_config_to_str(slices)
-def do_run_fio(bconf):
- benchmark_config = format_fio_config(bconf)
+def do_run_fio(config_slice):
+ benchmark_config = fio_config_to_str(config_slice)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
p = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
@@ -414,91 +377,37 @@
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out, exc.message))
- return zip(parsed_out, bconf)
-
-# limited by fio
-MAX_JOBS = 1000
+ return zip(parsed_out, config_slice)
-def next_test_portion(whole_conf, runcycle, cluster=False):
- if cluster:
- for name, sec in whole_conf:
- if '_ramp_time' in sec:
- sec['ramp_time'] = sec.pop('_ramp_time')
- yield [(name, sec)]
- return
-
- jcount = 0
- runtime = 0
- bconf = []
-
- for pos, (name, sec) in enumerate(whole_conf):
- jc = int(sec.get('numjobs', '1'))
-
- if runcycle is not None:
- curr_task_time = calculate_execution_time([(name, sec)])
- else:
- curr_task_time = 0
-
- if jc > MAX_JOBS:
- err_templ = "Can't process job {0!r} - too large numjobs"
- raise ValueError(err_templ.format(name))
-
- if runcycle is not None and len(bconf) != 0:
- rc_ok = curr_task_time + runtime <= runcycle
- else:
- rc_ok = True
-
- if jc + jcount <= MAX_JOBS and rc_ok:
- runtime += curr_task_time
- jcount += jc
- bconf.append((name, sec))
- if '_ramp_time' in sec:
- del sec['_ramp_time']
- continue
-
- assert len(bconf) != 0
- yield bconf
-
- if '_ramp_time' in sec:
- sec['ramp_time'] = sec.pop('_ramp_time')
- curr_task_time = calculate_execution_time([(name, sec)])
-
- runtime = curr_task_time
- jcount = jc
- bconf = [(name, sec)]
-
- if bconf != []:
- yield bconf
-
-
-def add_job_results(jname, job_output, jconfig, res):
+def add_job_results(section, job_output, res):
if job_output['write']['iops'] != 0:
raw_result = job_output['write']
else:
raw_result = job_output['read']
- if jname not in res:
+ vals = section.vals
+ if section.name not in res:
j_res = {}
- j_res["rw"] = jconfig["rw"]
- j_res["sync_mode"] = get_test_sync_mode(jconfig)
- j_res["concurence"] = int(jconfig.get("numjobs", 1))
- j_res["blocksize"] = jconfig["blocksize"]
+ j_res["rw"] = vals["rw"]
+ j_res["sync_mode"] = get_test_sync_mode(vals)
+ j_res["concurence"] = int(vals.get("numjobs", 1))
+ j_res["blocksize"] = vals["blocksize"]
j_res["jobname"] = job_output["jobname"]
- j_res["timings"] = [int(jconfig.get("runtime", 0)),
- int(jconfig.get("ramp_time", 0))]
+ j_res["timings"] = [int(vals.get("runtime", 0)),
+ int(vals.get("ramp_time", 0))]
else:
- j_res = res[jname]
- assert j_res["rw"] == jconfig["rw"]
- assert j_res["rw"] == jconfig["rw"]
- assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
- assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
- assert j_res["blocksize"] == jconfig["blocksize"]
+ j_res = res[section.name]
+ assert j_res["rw"] == vals["rw"]
+ assert j_res["rw"] == vals["rw"]
+ assert j_res["sync_mode"] == get_test_sync_mode(vals)
+ assert j_res["concurence"] == int(vals.get("numjobs", 1))
+ assert j_res["blocksize"] == vals["blocksize"]
assert j_res["jobname"] == job_output["jobname"]
# ramp part is skipped for all tests, except first
- # assert j_res["timings"] == (jconfig.get("runtime"),
- # jconfig.get("ramp_time"))
+ # assert j_res["timings"] == (vals.get("runtime"),
+ # vals.get("ramp_time"))
def j_app(name, x):
j_res.setdefault(name, []).append(x)
@@ -509,65 +418,46 @@
j_app("clat", raw_result["clat"]["mean"])
j_app("slat", raw_result["slat"]["mean"])
- res[jname] = j_res
+ res[section.name] = j_res
-def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
- whole_conf = list(parse_fio_config_full(benchmark_config, params))
- whole_conf = whole_conf[skip:]
- res = ""
-
- for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
- res += format_fio_config(bconf)
- res += "\n#" + "-" * 50 + "\n\n"
-
- return res
-
-
-def run_fio(benchmark_config,
- params,
- runcycle=None,
- raw_results_func=None,
- skip_tests=0,
- fake_fio=False,
- cluster=False):
-
- whole_conf = list(parse_fio_config_full(benchmark_config, params))
- whole_conf = whole_conf[skip_tests:]
- res = {}
- curr_test_num = skip_tests
- executed_tests = 0
+def run_fio(sliced_it, raw_results_func=None):
+ sliced_list = list(sliced_it)
ok = True
+
try:
- for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
+ curr_test_num = 0
+ executed_tests = 0
+ result = {}
- if fake_fio:
- res_cfg_it = do_run_fio_fake(bconf)
- else:
- res_cfg_it = do_run_fio(bconf)
-
+ for i, test_slice in enumerate(sliced_list):
+ res_cfg_it = do_run_fio(test_slice)
res_cfg_it = enumerate(res_cfg_it, curr_test_num)
- for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+ for curr_test_num, (job_output, section) in res_cfg_it:
executed_tests += 1
+
if raw_results_func is not None:
raw_results_func(executed_tests,
- [job_output, jname, jconfig])
+ [job_output, section])
- assert jname == job_output["jobname"], \
- "{0} != {1}".format(jname, job_output["jobname"])
+ msg = "{0} != {1}".format(section.name, job_output["jobname"])
+ assert section.name == job_output["jobname"], msg
- if jname.startswith('_'):
+ if section.name.startswith('_'):
continue
- add_job_results(jname, job_output, jconfig, res)
+ add_job_results(section, job_output, result)
+
curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
- exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
- print msg_template.format(curr_test_num - skip_tests,
- len(whole_conf),
- sec_to_str(exec_time))
+ rest = sliced_list[i:]
+ time_eta = sum(map(calculate_execution_time, rest))
+ test_left = sum(map(len, rest))
+ print msg_template.format(curr_test_num,
+ test_left,
+ sec_to_str(time_eta))
except (SystemExit, KeyboardInterrupt):
raise
@@ -578,7 +468,7 @@
print "======== END OF ERROR ========="
ok = False
- return res, executed_tests, ok
+ return result, executed_tests, ok
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -606,11 +496,6 @@
job_cfg += char
-def estimate_cfg(job_cfg, params, skip_tests=0):
- bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
- return calculate_execution_time(bconf)
-
-
def sec_to_str(seconds):
h = seconds // 3600
m = (seconds % 3600) // 60
@@ -641,12 +526,6 @@
help="Max cycle length in seconds")
parser.add_argument("--show-raw-results", action='store_true',
default=False, help="Output raw input and results")
- parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
- help="Skip NUM tests")
- parser.add_argument("--faked-fio", action='store_true',
- default=False, help="Emulate fio with 0 test time")
- parser.add_argument("--cluster", action='store_true',
- default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
@@ -674,22 +553,25 @@
name, val = param_val.split("=", 1)
params[name] = val
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
+
+ sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
+
if argv_obj.estimate:
- print sec_to_str(estimate_cfg(job_cfg, params))
+ it = map(calculate_execution_time, sliced_it)
+ print sec_to_str(sum(it))
return 0
if argv_obj.num_tests or argv_obj.compile:
if argv_obj.compile:
- data = compile(job_cfg, params, argv_obj.skip_tests,
- cluster=argv_obj.cluster)
- out_fd.write(data)
- out_fd.write("\n")
+ for test_slice in sliced_it:
+ out_fd.write(fio_config_to_str(test_slice))
+ out_fd.write("\n#" + "-" * 70 + "\n\n")
if argv_obj.num_tests:
- bconf = list(parse_fio_config_full(job_cfg, params,
- argv_obj.cluster))
- bconf = bconf[argv_obj.skip_tests:]
- print len(bconf)
+ print len(list(sliced_it))
return 0
@@ -708,14 +590,7 @@
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
- job_res, num_tests, ok = run_benchmark(argv_obj.type,
- job_cfg,
- params,
- argv_obj.runcycle,
- rrfunc,
- argv_obj.skip_tests,
- argv_obj.faked_fio,
- cluster=argv_obj.cluster)
+ job_res, num_tests, ok = run_benchmark(argv_obj.type, sliced_it, rrfunc)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
index 5e793f2..cc8e411 100644
--- a/wally/suits/io/io_scenario_ceph.cfg
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -7,56 +7,55 @@
softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
+NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
size=5G
-ramp_time=20
-runtime=20
+ramp_time=30
+runtime=60
# ---------------------------------------------------------------------
-# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# also check BW for seq read/write.
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+numjobs=1
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# this is essentially sequential read openration
+# we can't use seq read with numjobs > 1 on clouds due to caching
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=16m
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# sequential write
# ---------------------------------------------------------------------
[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=1m
-rw=read
+rw=write
direct=1
-offset_increment={PER_TH_OFFSET}
-numjobs={% 20, 120 %}
-
-# # ---------------------------------------------------------------------
-# # check different thread count, sync mode. (latency, iops) = func(th_count)
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randwrite
-# sync=1
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check different thread count, direct read mode. (latency, iops) = func(th_count)
-# # also check iops for randread
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randread
-# direct=1
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# # also check BW for seq read/write.
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=1m
-# rw={% read, write %}
-# direct=1
-# offset_increment=1073741824 # 1G
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check IOPS randwrite.
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randwrite
-# direct=1
+numjobs={NUMJOBS}
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 46191f2..519fb0f 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -7,8 +7,9 @@
softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
+NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-size=10Gb
+size=10G
ramp_time=5
runtime=30
@@ -19,7 +20,7 @@
blocksize=4k
rw=randwrite
sync=1
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+numjobs={NUMJOBS}
# ---------------------------------------------------------------------
# check different thread count, direct read mode. (latency, iops) = func(th_count)
@@ -29,18 +30,16 @@
blocksize=4k
rw=randread
direct=1
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+numjobs={NUMJOBS}
# ---------------------------------------------------------------------
-# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# also check BW for seq read/write.
+# No reason for th count > 1 in case of sequantial operations
+# They became random
# ---------------------------------------------------------------------
[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=1m
rw={% read, write %}
direct=1
-offset_increment=1073741824 # 1G
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
# check IOPS randwrite.
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index c5615bb..0062569 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -101,16 +101,17 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
- self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
- self.config_params))
+ self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
+ self.config_params))
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
fio_command_file = open_for_append_or_create(cmd_log)
- fio_command_file.write(io_agent.compile(self.raw_cfg,
- self.config_params,
- None))
+
+ cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
+ splitter = "\n\n" + "-" * 60 + "\n\n"
+ fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def cleanup(self, conn):
@@ -140,22 +141,23 @@
if self.options.get('prefill_files', True):
files = {}
- for secname, params in self.configs:
- sz = ssize_to_b(params['size'])
+ for section in self.configs:
+ sz = ssize_to_b(section.vals['size'])
msz = sz / (1024 ** 2)
+
if sz % (1024 ** 2) != 0:
msz += 1
- fname = params['filename']
+ fname = section.vals['filename']
# if already has other test with the same file name
# take largest size
files[fname] = max(files.get(fname, 0), msz)
# logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ # cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
ssize = 0
stime = time.time()
@@ -175,8 +177,8 @@
def run(self, conn, barrier):
# logger.warning("No tests runned")
# return
- cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
- # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+ # cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+ cmd_templ = "env python2 {0} --type {1} {2} --json -"
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
@@ -184,18 +186,10 @@
if "" != params:
params = "--params " + params
- if self.options.get('cluster', False):
- logger.info("Cluster mode is used")
- cluster_opt = "--cluster"
- else:
- logger.info("Non-cluster mode is used")
- cluster_opt = ""
-
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
- cluster_opt)
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
logger.debug("Waiting on barrier")
- exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
+ exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try: