Merge pull request #4 from Mirantis/sender
improvement of protocol
diff --git a/TODO b/TODO
index 7c51227..4bf20cc 100644
--- a/TODO
+++ b/TODO
@@ -8,6 +8,7 @@
Make python module
putget/ssbench tests (костя)
тестирование (костя)
+отдельный тенант на все
Intellectual granular sensors
diff --git a/configs/usb_hdd.yaml b/configs/usb_hdd.yaml
index 224c6f6..a6e52ed 100644
--- a/configs/usb_hdd.yaml
+++ b/configs/usb_hdd.yaml
@@ -1,6 +1,5 @@
explicit_nodes:
- "ssh://koder@192.168.0.108::/home/koder/.ssh/id_rsa": testnode
- # local: testnode
+ local: testnode
internal:
var_dir_root: /tmp/perf_tests
@@ -13,10 +12,12 @@
testnode: system-cpu, block-io
tests:
- - io:
- cfg: wally/suits/io/io_scenario_hdd.cfg
- params:
- FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
- NUM_ROUNDS: 7
+ - io:
+ cfg: wally/suits/io/io_scenario_hdd.cfg
+ prefill_files: false
+ params:
+ FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+ NUM_ROUNDS: 5
+
logging:
extra_logs: 1
\ No newline at end of file
diff --git a/report.html b/report.html
deleted file mode 100644
index b82cbbd..0000000
--- a/report.html
+++ /dev/null
@@ -1,11 +0,0 @@
-<!DOCTYPE html>
-<html>
-<head>
- <title>Report</title>
-</head>
-
-<body>
-%(body)s
-</body>
-
-</html>
\ No newline at end of file
diff --git a/report_templates/report.html b/report_templates/report.html
new file mode 100644
index 0000000..5eba9c7
--- /dev/null
+++ b/report_templates/report.html
@@ -0,0 +1,73 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Report</title>
+ <link rel="stylesheet"
+ href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+</head>
+
+<body>
+<div class="page-header text-center">
+ <h2>Performance Report</h2>
+</div>
+<div class="container">
+ <div class="row">
+ <table style="width: auto;" class="table table-bordered table-striped">
+ <tr>
+ <td>Test</td>
+ <td>Iops/BW</td>
+ <td>Test</td>
+ <td>Iops/BW</td>
+ </tr>
+ <tr>
+ <td>Rand read 4k direct max IOPS</td>
+ <td><div align="right">{direct_iops_r_max} IOPS</div></td>
+ <td>Rand write 4k sync IOPS 10ms lat</td>
+ <td><div align="right">{rws4k_10ms} IOPS</div></td>
+ </tr>
+ <tr>
+ <td>Rand write 4k direct max IOPS</td>
+ <td><div align="right">{direct_iops_w_max} IOPS</div></td>
+ <td>Rand write 4k sync IOPS 30ms lat</td>
+ <td><div align="right">{rws4k_30ms} IOPS</div></td>
+ </tr>
+ <tr>
+ <td>Direct sequential read</td>
+ <td><div align="right">{bw_read_max} MiBps</div></td>
+ <td>Rand write 4k sync IOPS 100ms lat</td>
+ <td><div align="right">{rws4k_100ms} IOPS</div></td>
+ </tr>
+ <tr>
+ <td>Direct sequential write</td>
+ <td><div align="right">{bw_write_max} MiBps</div></td>
+ <td></td><td></td>
+ </tr>
+ </table>
+ </div>
+ <img src="charts/rand_read_4k.png" />
+ <img src="charts/rand_write_4k.png" />
+
+ <!--div class="row">
+ <table style="width: auto;" class="table table-bordered table-striped">
+ <tr>
+ <td>Disk total</td>
+ <td>{lab_info[total_disk]}</td>
+ </tr>
+ <tr>
+ <td>Memory total</td>
+ <td>{lab_info[total_memory]}</td>
+ </tr>
+ <tr>
+ <td>Nodes count</td>
+ <td>{lab_info[nodes_count]}</td>
+ </tr>
+ <tr>
+ <td>CPU count</td>
+ <td>{lab_info[processor_count]}</td>
+ </tr>
+ </table>
+ </div-->
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 825a5bc..e619460 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,16 +1,19 @@
-priest
+decorator
+futures
GChartWrapper==0.8
-decorator==3.4.0
-futures==2.2.0
iso8601==0.1.10
-netaddr==0.7.13
+netaddr
+oktest
paramiko==1.15.2
-petname==1.7
+petname
+prest
prettytable==0.7.2
pyOpenSSL==0.14
python-cinderclient
python-glanceclient
python-keystoneclient
python-novaclient
-requests==2.2.1
-simplejson==3.6.5
+PyYAML
+requests
+simplejson
+texttable
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/test_agent.py b/tests/test_agent.py
new file mode 100644
index 0000000..f9fa389
--- /dev/null
+++ b/tests/test_agent.py
@@ -0,0 +1,424 @@
+import os.path
+import unittest
+
+
+from oktest import ok, main, test
+
+
+from wally.suits.io import agent
+
+code_test_defaults = """
+[defaults]
+wait_for_previous
+buffered=0
+iodepth=2
+RUNTIME=20
+
+[sec1]
+group_reporting
+time_based
+softrandommap=1
+filename=/tmp/xxx
+size=5G
+ramp_time=20
+runtime={RUNTIME}
+blocksize=1m
+rw=read
+direct=1
+numjobs=1
+some_extra=1.2314
+
+[sec2]
+group_reporting
+time_based
+iodepth=1
+softrandommap=1
+filename=/tmp/xxx
+size=5G
+ramp_time=20
+runtime={RUNTIME}
+blocksize=1m
+rw=read
+direct=1
+numjobs=1
+some_extra=1.2314
+"""
+
+defaults = """
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename=/tmp/xxx
+size=5G
+ramp_time=20
+runtime=20
+blocksize=1m
+rw=read
+direct=1
+numjobs=1
+"""
+
+code_test_auto_params_1 = defaults + """
+[defaults]
+RUNTIME=30
+
+[sec1_{TEST_SUMM}]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+
+code_test_uniq = defaults + """
+[defaults]
+REPCOUNT=2
+RUNTIME=30
+
+[sec1_{TEST_SUMM}_{UNIQ} * 3]
+
+[sec2_{TEST_SUMM}_{UNIQ} * {REPCOUNT}]
+"""
+
+code_test_cycles_default = defaults + """
+[defaults]
+REPCOUNT=2
+RUNTIME={% 30, 60 %}
+
+[sec1_{TEST_SUMM}_{UNIQ} * 3]
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+
+P = agent.parse_all_in_1
+
+
+class AgentTest(unittest.TestCase):
+ @test("test_parse_value")
+ def test_parse_value(self):
+ x = "asdfasd adsd d"
+ ok(agent.parse_value(x)) == x
+ ok(agent.parse_value("10 2")) == "10 2"
+ ok(agent.parse_value(None)).is_(None)
+ ok(agent.parse_value("10")) == 10
+ ok(agent.parse_value("20")) == 20
+ ok(agent.parse_value("10.1") - 10.1) < 1E-7
+ ok(agent.parse_value("{% 10, 20 %}")) == [10, 20]
+ ok(agent.parse_value("{% 10,20 %}")) == [10, 20]
+
+ code_test_compile_simplest = defaults + """
+[sec1]
+some_extra=1.2314
+"""
+
+ @test("test_compile_simplest")
+ def test_compile_simplest(self):
+ sections = P(self.code_test_compile_simplest, {})
+ sections = list(sections)
+
+ ok(len(sections)) == 1
+ sec1 = sections[0]
+ ok(sec1.name) == "sec1"
+ vals = sec1.vals
+ ok(vals['wait_for_previous']).is_(None)
+ ok(vals['iodepth']) == 1
+ ok(vals['some_extra'] - 1.2314) < 1E-7
+
+ code_test_params_in_defaults = defaults + """
+[defaults]
+RUNTIME=20
+
+[sec1]
+runtime={RUNTIME}
+"""
+
+ @test("test_compile_defaults")
+ def test_compile_defaults(self):
+ sections = P(self.code_test_params_in_defaults, {})
+ sections = list(sections)
+
+ ok(len(sections)) == 1
+ sec1 = sections[0]
+ ok(sec1.name) == "sec1"
+ vals = sec1.vals
+ ok(vals['wait_for_previous']).is_(None)
+ ok(vals['iodepth']) == 1
+ ok(vals['runtime']) == 20
+
+ @test("test_defaults")
+ def test_defaults(self):
+ sections = P(code_test_defaults, {})
+ sections = list(sections)
+
+ ok(len(sections)) == 2
+ sec1, sec2 = sections
+
+ ok(sec1.name) == "sec1"
+ ok(sec2.name) == "sec2"
+
+ ok(sec1.vals['wait_for_previous']).is_(None)
+ ok(sec2.vals['wait_for_previous']).is_(None)
+
+ ok(sec1.vals['iodepth']) == 2
+ ok(sec2.vals['iodepth']) == 1
+
+ ok(sec1.vals['buffered']) == 0
+ ok(sec2.vals['buffered']) == 0
+
+ code_test_ext_params = defaults + """
+[sec1]
+runtime={RUNTIME}
+"""
+
+ @test("test_external_params")
+ def test_external_params(self):
+ with self.assertRaises(KeyError):
+ sections = P(self.code_test_ext_params, {})
+ list(sections)
+
+ sections = P(self.code_test_ext_params,
+ {'RUNTIME': 20})
+ sections = list(sections)
+
+ code_test_cycle = defaults + """
+[sec1]
+runtime={RUNTIME}
+ramp_time={% 20, 40 %}
+"""
+
+ @test("test_cycle")
+ def test_cycle(self):
+ sections = P(self.code_test_cycle,
+ {'RUNTIME': 20})
+ sections = list(sections)
+ ok(len(sections)) == 2
+ ok(sections[0].vals['ramp_time']) == 20
+ ok(sections[1].vals['ramp_time']) == 40
+
+ code_test_cycles = defaults + """
+[sec1]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+ @test("test_cycles")
+ def test_cycles(self):
+ sections = P(self.code_test_cycles,
+ {'RUNTIME': 20})
+ sections = list(sections)
+ ok(len(sections)) == 4
+
+ combinations = [
+ (section.vals['ramp_time'], section.vals['blocksize'])
+ for section in sections
+ ]
+
+ combinations.sort()
+
+ ok(combinations) == [(20, '4k'), (20, '4m'), (40, '4k'), (40, '4m')]
+
+ @test("test_time_estimate")
+ def test_time_estimate(self):
+ sections = P(self.code_test_cycles,
+ {'RUNTIME': 20})
+ sections = list(sections)
+ etime = agent.calculate_execution_time(sections)
+
+ ok(etime) == 20 * 4 + 20 * 2 + 40 * 2
+ ok(agent.sec_to_str(etime)) == "0:03:20"
+
+ code_test_cycles2 = defaults + """
+[sec1 * 7]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+ @test("test_time_estimate")
+ def test_time_estimate_large(self):
+ sections = P(self.code_test_cycles2,
+ {'RUNTIME': 30})
+ sections = list(sections)
+
+ ok(sections[0].name) == 'sec1'
+ ok(len(sections)) == 7 * 4
+
+ etime = agent.calculate_execution_time(sections)
+ # ramptime optimization
+ expected_time = (20 + 30 + 30 * 6) * 2
+ expected_time += (40 + 30 + 30 * 6) * 2
+ ok(etime) == expected_time
+
+ code_test_cycles3 = defaults + """
+[sec1 * 7]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+
+[sec2 * 7]
+ramp_time={% 20, 40 %}
+runtime={RUNTIME}
+blocksize={% 4k, 4m %}
+"""
+
+ @test("test_time_estimate2")
+ def test_time_estimate_large2(self):
+ sections = P(self.code_test_cycles3, {'RUNTIME': 30})
+ sections = list(sections)
+
+ ok(sections[0].name) == 'sec1'
+ ok(sections[1].name) == 'sec1'
+ ok(len(sections)) == 7 * 4 * 2
+
+ etime = agent.calculate_execution_time(sections)
+ # ramptime optimization
+ expected_time = (20 + 30 + 30 * 6) * 2
+ expected_time += (40 + 30 + 30 * 6) * 2
+ ok(etime) == expected_time * 2
+
+ code_test_repeats = defaults + """
+[defaults]
+REPCOUNT=2
+[sec1 * 3]
+[sec2 * {REPCOUNT}]
+"""
+
+ @test("test_repeat")
+ def test_repeat(self):
+ sections = P(self.code_test_repeats, {})
+ sections = list(sections)
+ ok(len(sections)) == 2 + 3
+ ok(sections[0].name) == 'sec1'
+ ok(sections[1].name) == 'sec1'
+ ok(sections[2].name) == 'sec1'
+ ok(sections[3].name) == 'sec2'
+ ok(sections[4].name) == 'sec2'
+
+ @test("test_real_tasks")
+ def test_real_tasks(self):
+ tasks_dir = os.path.dirname(agent.__file__)
+ fname = os.path.join(tasks_dir, 'io_scenario_ceph.cfg')
+ fc = open(fname).read()
+
+ sections = P(fc, {'FILENAME': '/dev/null'})
+ sections = list(sections)
+
+ ok(len(sections)) == 7 * 9 * 4 + 7
+
+ etime = agent.calculate_execution_time(sections)
+ # ramptime optimization
+ expected_time = (60 * 7 + 30) * 9 * 4 + (60 * 7 + 30)
+ ok(etime) == expected_time
+
+if __name__ == '__main__':
+ main()
+
+# def do_run_fio_fake(bconf):
+# def estimate_iops(sz, bw, lat):
+# return 1 / (lat + float(sz) / bw)
+# global count
+# count += 1
+# parsed_out = []
+
+# BW = 120.0 * (1024 ** 2)
+# LAT = 0.003
+
+# for name, cfg in bconf:
+# sz = to_bytes(cfg['blocksize'])
+# curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
+# curr_ulat = curr_lat * 1000000
+# curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
+# iops = estimate_iops(sz, curr_bw, curr_lat)
+# bw = iops * sz
+
+# res = {'ctx': 10683,
+# 'error': 0,
+# 'groupid': 0,
+# 'jobname': name,
+# 'majf': 0,
+# 'minf': 30,
+# 'read': {'bw': 0,
+# 'bw_agg': 0.0,
+# 'bw_dev': 0.0,
+# 'bw_max': 0,
+# 'bw_mean': 0.0,
+# 'bw_min': 0,
+# 'clat': {'max': 0,
+# 'mean': 0.0,
+# 'min': 0,
+# 'stddev': 0.0},
+# 'io_bytes': 0,
+# 'iops': 0,
+# 'lat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0},
+# 'runtime': 0,
+# 'slat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0}
+# },
+# 'sys_cpu': 0.64,
+# 'trim': {'bw': 0,
+# 'bw_agg': 0.0,
+# 'bw_dev': 0.0,
+# 'bw_max': 0,
+# 'bw_mean': 0.0,
+# 'bw_min': 0,
+# 'clat': {'max': 0,
+# 'mean': 0.0,
+# 'min': 0,
+# 'stddev': 0.0},
+# 'io_bytes': 0,
+# 'iops': 0,
+# 'lat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0},
+# 'runtime': 0,
+# 'slat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0}
+# },
+# 'usr_cpu': 0.23,
+# 'write': {'bw': 0,
+# 'bw_agg': 0,
+# 'bw_dev': 0,
+# 'bw_max': 0,
+# 'bw_mean': 0,
+# 'bw_min': 0,
+# 'clat': {'max': 0, 'mean': 0,
+# 'min': 0, 'stddev': 0},
+# 'io_bytes': 0,
+# 'iops': 0,
+# 'lat': {'max': 0, 'mean': 0,
+# 'min': 0, 'stddev': 0},
+# 'runtime': 0,
+# 'slat': {'max': 0, 'mean': 0.0,
+# 'min': 0, 'stddev': 0.0}
+# }
+# }
+
+# if cfg['rw'] in ('read', 'randread'):
+# key = 'read'
+# elif cfg['rw'] in ('write', 'randwrite'):
+# key = 'write'
+# else:
+# raise ValueError("Uknown op type {0}".format(key))
+
+# res[key]['bw'] = bw
+# res[key]['iops'] = iops
+# res[key]['runtime'] = 30
+# res[key]['io_bytes'] = res[key]['runtime'] * bw
+# res[key]['bw_agg'] = bw
+# res[key]['bw_dev'] = bw / 30
+# res[key]['bw_max'] = bw * 1.5
+# res[key]['bw_min'] = bw / 1.5
+# res[key]['bw_mean'] = bw
+# res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
+# 'min': curr_ulat / 2, 'stddev': curr_ulat}
+# res[key]['lat'] = res[key]['clat'].copy()
+# res[key]['slat'] = res[key]['clat'].copy()
+
+# parsed_out.append(res)
+
+# return zip(parsed_out, bconf)
diff --git a/wally/charts.py b/wally/charts.py
index a168f9d..12d735f 100644
--- a/wally/charts.py
+++ b/wally/charts.py
@@ -14,7 +14,7 @@
sys.modules['GChartWrapper.GChart'].MARKERS += 'E'
-COLORS = ["1569C7", "81D8D0", "307D7E", "5CB3FF", "0040FF", "81DAF5"]
+COLORS = ["1569C7", "81D8D0", "B0BD2E", "5CB3FF", "0040FF", "81DAF5"]
constants.MARKERS += 'E' # append E marker to available markers
diff --git a/wally/config.py b/wally/config.py
index 7dba134..d8b7085 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -37,11 +37,11 @@
else:
run_uuid = str(uuid.uuid4())
results_dir = os.path.join(var_dir, run_uuid)
+ cfg_dict['run_uuid'] = run_uuid.replace('_', '-')
else:
results_dir = explicit_folder
cfg_dict['var_dir'] = results_dir
- cfg_dict['run_uuid'] = run_uuid.replace('_', '-')
mkdirs_if_unxists(cfg_dict['var_dir'])
in_var_dir = functools.partial(os.path.join, cfg_dict['var_dir'])
@@ -95,7 +95,10 @@
else:
record.levelname = prn_name
- res = super(ColoredFormatter, self).format(record)
+ # super doesn't work here in 2.6 O_o
+ res = logging.Formatter.format(self, record)
+
+ # res = super(ColoredFormatter, self).format(record)
# restore record, as it will be used by other formatters
record.__dict__ = orig
diff --git a/wally/discover/node.py b/wally/discover/node.py
index dc1c9b0..c2ae5aa 100644
--- a/wally/discover/node.py
+++ b/wally/discover/node.py
@@ -1,4 +1,4 @@
-import urlparse
+from wally.ssh_utils import parse_ssh_uri
class Node(object):
@@ -10,16 +10,19 @@
self.monitor_url = None
def get_ip(self):
- return urlparse.urlparse(self.conn_url).hostname
+ if self.conn_url == 'local':
+ return '127.0.0.1'
+
+ assert self.conn_url.startswith("ssh://")
+ return parse_ssh_uri(self.conn_url[6:]).host
def get_conn_id(self):
- host = urlparse.urlparse(self.conn_url).hostname
- port = urlparse.urlparse(self.conn_url).port
+ if self.conn_url == 'local':
+ return '127.0.0.1'
- if port is None:
- port = 22
-
- return host + ":" + str(port)
+ assert self.conn_url.startswith("ssh://")
+ creds = parse_ssh_uri(self.conn_url[6:])
+ return "{0.host}:{0.port}".format(creds)
def __str__(self):
templ = "<Node: url={conn_url!r} roles={roles}" + \
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 499510d..737bf2e 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -47,7 +47,7 @@
else:
data_json = json.dumps(params)
- logger.debug("HTTP: {} {}".format(method.upper(), url))
+ logger.debug("HTTP: {0} {1}".format(method.upper(), url))
request = urllib2.Request(url,
data=data_json,
@@ -58,7 +58,7 @@
request.get_method = lambda: method.upper()
response = urllib2.urlopen(request)
- logger.debug("HTTP Responce: {}".format(response.code))
+ logger.debug("HTTP Responce: {0}".format(response.code))
if response.code < 200 or response.code > 209:
raise IndexError(url)
@@ -124,12 +124,12 @@
self.__connection__ = conn
def __str__(self):
- res = ["{}({}):".format(self.__class__.__name__, self.name)]
+ res = ["{0}({1}):".format(self.__class__.__name__, self.name)]
for k, v in sorted(self.__dict__.items()):
if k.startswith('__') or k.endswith('__'):
continue
if k != 'name':
- res.append(" {}={!r}".format(k, v))
+ res.append(" {0}={1!r}".format(k, v))
return "\n".join(res)
def __getitem__(self, item):
diff --git a/wally/meta_info.py b/wally/meta_info.py
index 127612b..12cb061 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -19,8 +19,8 @@
def to_gb(x):
return x / (1024 ** 3)
- lab_data['total_memory'] = format(to_gb(lab_data['total_memory']), ',d')
- lab_data['total_disk'] = format(to_gb(lab_data['total_disk']), ',d')
+ lab_data['total_memory'] = to_gb(lab_data['total_memory'])
+ lab_data['total_disk'] = to_gb(lab_data['total_disk'])
return lab_data
@@ -62,7 +62,6 @@
nodes.append(d)
result['nodes'] = nodes
- # result['name'] = 'Perf-1 Env'
result['fuel_version'] = fuel_version['release']
return result
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index 4fc4a49..f078ff2 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -41,9 +41,9 @@
if isinstance(data, (list, tuple)):
if all(map(is_simple, data)):
if all_nums(data):
- one_line = "[{}]".format(", ".join(map(dumps_simple, data)))
+ one_line = "[{0}]".format(", ".join(map(dumps_simple, data)))
else:
- one_line = "[{}]".format(",".join(map(dumps_simple, data)))
+ one_line = "[{0}]".format(",".join(map(dumps_simple, data)))
else:
one_line = None
diff --git a/wally/report.py b/wally/report.py
index d2f2d96..85ac388 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,152 +1,24 @@
import os
-import sys
+import bisect
+import logging
+import wally
from wally import charts
-from wally.statistic import med_dev
from wally.utils import parse_creds
-from wally.suits.io.results_loader import filter_data
+from wally.suits.io.results_loader import process_disk_info
from wally.meta_info import total_lab_info, collect_lab_data
-# from collections import OrderedDict
-# from wally.suits.io import formatter
-# def pgbench_chart_data(results):
-# """
-# Format pgbench results for chart
-# """
-# data = {}
-# charts_url = []
-
-# formatted_res = formatters.format_pgbench_stat(results)
-# for key, value in formatted_res.items():
-# num_cl, num_tr = key.split(' ')
-# data.setdefault(num_cl, {}).setdefault(build, {})
-# data[keys[z]][build][
-# ' '.join(keys)] = value
-
-# for name, value in data.items():
-# title = name
-# legend = []
-# dataset = []
-
-# scale_x = []
-
-# for build_id, build_results in value.items():
-# vals = []
-# OD = OrderedDict
-# ordered_build_results = OD(sorted(build_results.items(),
-# key=lambda t: t[0]))
-# scale_x = ordered_build_results.keys()
-# for key in scale_x:
-# res = build_results.get(key)
-# if res:
-# vals.append(res)
-# if vals:
-# dataset.append(vals)
-# legend.append(build_id)
-
-# if dataset:
-# charts_url.append(str(charts.render_vertical_bar
-# (title, legend, dataset, scale_x=scale_x)))
-# return charts_url
-
-# def build_lines_chart(results, z=0):
-# data = {}
-# charts_url = []
-
-# for build, res in results:
-# formatted_res = formatters.get_formatter(build)(res)
-# for key, value in formatted_res.items():
-# keys = key.split(' ')
-# data.setdefault(key[z], {})
-# data[key[z]].setdefault(build, {})[keys[1]] = value
-
-# for name, value in data.items():
-# title = name
-# legend = []
-# dataset = []
-# scale_x = []
-# for build_id, build_results in value.items():
-# legend.append(build_id)
-
-# OD = OrderedDict
-# ordered_build_results = OD(sorted(build_results.items(),
-# key=lambda t: ssize_to_b(t[0])))
-
-# if not scale_x:
-# scale_x = ordered_build_results.keys()
-# dataset.append(zip(*ordered_build_results.values())[0])
-
-# chart = charts.render_lines(title, legend, dataset, scale_x)
-# charts_url.append(str(chart))
-
-# return charts_url
-
-# def build_vertical_bar(results, z=0):
-# data = {}
-# charts_url = []
-# for build, res in results:
-# formatted_res = formatter.get_formatter(build)(res)
-# for key, value in formatted_res.items():
-# keys = key.split(' ')
-# data.setdefault(keys[z], {}).setdefault(build, {})
-# data[keys[z]][build][
-# ' '.join(keys)] = value
-
-# for name, value in data.items():
-# title = name
-# legend = []
-# dataset = []
-
-# scale_x = []
-
-# for build_id, build_results in value.items():
-# vals = []
-# OD = OrderedDict
-# ordered_build_results = OD(sorted(build_results.items(),
-# key=lambda t: t[0]))
-# scale_x = ordered_build_results.keys()
-# for key in scale_x:
-# res = build_results.get(key)
-# if res:
-# vals.append(res)
-# if vals:
-# dataset.append(vals)
-# legend.append(build_id)
-
-# if dataset:
-# charts_url.append(str(charts.render_vertical_bar
-# (title, legend, dataset, scale_x=scale_x)))
-# return charts_url
+logger = logging.getLogger("wally.report")
-def render_html(charts_urls, dest, lab_description, info):
- templ = open("report.html", 'r').read()
- body = "<a href='#lab_desc'>Lab description</a>" \
- "<ol>{0}</ol>" \
- "<div>{1}</div>" \
- '<a name="lab_desc"></a>' \
- "<div><ul>{2}</ul></div>"
- table = "<table><tr><td>{0}</td><td>{1}</td></tr>" \
- "<tr><td>{2}</td><td>{3}</td></tr></table>"
- ul = []
- ol = []
- li = '<li>{0} : {1}</li>'
-
- for elem in info:
- ol.append(li.format(elem.keys(), elem.values()))
-
- for key in lab_description:
- value = lab_description[key]
- ul.append("<li>{0} : {1}</li>".format(key, value))
-
- charts_urls = ['<img src="{0}">'.format(url) for url in charts_urls]
-
- body = body.format('\n'.join(ol),
- table.format(*charts_urls),
- '\n'.join(ul))
-
- open(dest, 'w').write(templ % {'body': body})
+def render_html(dest, info, lab_description):
+ very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+ templ_dir = os.path.join(very_root_dir, 'report_templates')
+ templ_file = os.path.join(templ_dir, "report.html")
+ templ = open(templ_file, 'r').read()
+ report = templ.format(lab_info=lab_description, **info.__dict__)
+ open(dest, 'w').write(report)
def io_chart(title, concurence, latv, iops_or_bw, iops_or_bw_dev,
@@ -171,11 +43,110 @@
lines=[
(latv, "msec", "rr", "lat"),
(iops_or_bw_per_vm, None, None,
- "bw_per_vm")
+ "IOPS per vm")
])
return str(ch)
+def make_plots(processed_results, path):
+ name_filters = [
+ ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k sync IOPS'),
+ ('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+ ]
+
+ for name_pref, fname, desc in name_filters:
+ chart_data = []
+ for res in processed_results.values():
+ if res.name.startswith(name_pref):
+ chart_data.append(res)
+
+ chart_data.sort(key=lambda x: x.raw['concurence'])
+
+ lat = [x.lat for x in chart_data]
+ concurence = [x.raw['concurence'] for x in chart_data]
+ iops = [x.iops for x in chart_data]
+ iops_dev = [x.iops * x.dev for x in chart_data]
+
+ io_chart(desc, concurence, lat, iops, iops_dev, 'bw', fname)
+
+
+class DiskInfo(object):
+ def __init__(self):
+ self.direct_iops_r_max = 0
+ self.direct_iops_w_max = 0
+ self.rws4k_10ms = 0
+ self.rws4k_30ms = 0
+ self.rws4k_100ms = 0
+ self.bw_write_max = 0
+ self.bw_read_max = 0
+
+
+def get_disk_info(processed_results):
+ di = DiskInfo()
+ rws4k_iops_lat_th = []
+
+ for res in processed_results.values():
+ if res.raw['sync_mode'] == 'd' and res.raw['blocksize'] == '4k':
+ if res.raw['rw'] == 'randwrite':
+ di.direct_iops_w_max = max(di.direct_iops_w_max, res.iops)
+ elif res.raw['rw'] == 'randread':
+ di.direct_iops_r_max = max(di.direct_iops_r_max, res.iops)
+ elif res.raw['sync_mode'] == 's' and res.raw['blocksize'] == '4k':
+ if res.raw['rw'] != 'randwrite':
+ continue
+
+ rws4k_iops_lat_th.append((res.iops, res.lat,
+ res.raw['concurence']))
+
+ elif res.raw['sync_mode'] == 'd' and res.raw['blocksize'] == '1m':
+
+ if res.raw['rw'] == 'write':
+ di.bw_write_max = max(di.bw_write_max, res.bw)
+ elif res.raw['rw'] == 'read':
+ di.bw_read_max = max(di.bw_read_max, res.bw)
+
+ di.bw_write_max /= 1000
+ di.bw_read_max /= 1000
+
+ rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
+
+ latv = [lat for _, lat, _ in rws4k_iops_lat_th]
+
+ for tlatv_ms in [10, 30, 100]:
+ tlat = tlatv_ms * 1000
+ pos = bisect.bisect_left(latv, tlat)
+ if 0 == pos:
+ iops3 = 0
+ elif pos == len(latv):
+ iops3 = latv[-1]
+ else:
+ lat1 = latv[pos - 1]
+ lat2 = latv[pos]
+
+ th1 = rws4k_iops_lat_th[pos - 1][2]
+ th2 = rws4k_iops_lat_th[pos][2]
+
+ iops1 = rws4k_iops_lat_th[pos - 1][0]
+ iops2 = rws4k_iops_lat_th[pos][0]
+
+ th_lat_coef = (th2 - th1) / (lat2 - lat1)
+ th3 = th_lat_coef * (tlat - lat1) + th1
+
+ th_iops_coef = (iops2 - iops1) / (th2 - th1)
+ iops3 = th_iops_coef * (th3 - th1) + iops1
+ setattr(di, 'rws4k_{}ms'.format(tlatv_ms), int(iops3))
+
+ hdi = DiskInfo()
+ hdi.direct_iops_r_max = di.direct_iops_r_max
+ hdi.direct_iops_w_max = di.direct_iops_w_max
+ hdi.rws4k_10ms = di.rws4k_10ms if 0 != di.rws4k_10ms else '-'
+ hdi.rws4k_30ms = di.rws4k_30ms if 0 != di.rws4k_30ms else '-'
+ hdi.rws4k_100ms = di.rws4k_100ms if 0 != di.rws4k_100ms else '-'
+ hdi.bw_write_max = di.bw_write_max
+ hdi.bw_read_max = di.bw_read_max
+ return hdi
+
+
def make_io_report(results, path, lab_url=None, creds=None):
if lab_url is not None:
username, password, tenant_name = parse_creds(creds)
@@ -185,52 +156,19 @@
data = collect_lab_data(lab_url, creds)
lab_info = total_lab_info(data)
else:
- lab_info = ""
+ lab_info = {
+ "total_disk": "None",
+ "total_memory": "None",
+ "nodes_count": "None",
+ "processor_count": "None"
+ }
- for suite_type, test_suite_data in results:
- if suite_type != 'io' or test_suite_data is None:
- continue
-
- io_test_suite_res = test_suite_data['res']
-
- charts_url = []
- info = []
-
- name_filters = [
- ('hdd_test_rrd4k', ('concurence', 'lat', 'iops'), 'rand_read_4k'),
- ('hdd_test_swd1m', ('concurence', 'lat', 'bw'), 'seq_write_1m'),
- ('hdd_test_srd1m', ('concurence', 'lat', 'bw'), 'seq_read_1m'),
- ('hdd_test_rws4k', ('concurence', 'lat', 'bw'), 'rand_write_1m')
- ]
-
- for name_filter, fields, fname in name_filters:
- th_filter = filter_data(name_filter, fields)
-
- data = sorted(th_filter(io_test_suite_res.values()))
- if len(data) == 0:
- continue
-
- concurence, latv, iops_or_bw_v = zip(*data)
- iops_or_bw_v, iops_or_bw_dev_v = zip(*map(med_dev, iops_or_bw_v))
- latv, _ = zip(*map(med_dev, latv))
-
- url = io_chart(name_filter, concurence, latv, iops_or_bw_v,
- iops_or_bw_dev_v,
- fields[2], fname)
- info.append(dict(zip(fields, (concurence, latv, iops_or_bw_v))))
- charts_url.append(url)
-
- if len(charts_url) != 0:
- render_html(charts_url, path, lab_info, info)
-
-
-def main(args):
- make_io_report(results=[('a', 'b')],
- path=os.path.dirname(args[0]),
- lab_url='http://172.16.52.112:8000',
- creds='admin:admin@admin')
- return 0
-
-
-if __name__ == '__main__':
- exit(main(sys.argv))
+ try:
+ processed_results = process_disk_info(results)
+ make_plots(processed_results, path)
+ di = get_disk_info(processed_results)
+ render_html(path, di, lab_info)
+ except Exception as exc:
+ logger.error("Failed to generate html report:" + exc.message)
+ else:
+ logger.info("Html report were stored in " + path)
diff --git a/wally/run_test.py b/wally/run_test.py
index b22ce38..d578349 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -44,21 +44,22 @@
def connect_one(node, vm=False):
+ if node.conn_url == 'local':
+ node.connection = ssh_utils.connect(node.conn_url)
+ return
+
try:
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
url = node.conn_url[len(ssh_pref):]
if vm:
- ret_count = 24
- log_warns = False
+ conn_timeout = 240
else:
- ret_count = 3
- log_warns = True
+ conn_timeout = 30
node.connection = ssh_utils.connect(url,
- retry_count=ret_count,
- log_warns=log_warns)
+ conn_timeout=conn_timeout)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception as exc:
@@ -78,9 +79,9 @@
def test_thread(test, node, barrier, res_q):
try:
- logger.debug("Run preparation for {0}".format(node.conn_url))
+ logger.debug("Run preparation for {0}".format(node.get_conn_id()))
test.pre_run(node.connection)
- logger.debug("Run test for {0}".format(node.conn_url))
+ logger.debug("Run test for {0}".format(node.get_conn_id()))
test.run(node.connection, barrier)
except Exception as exc:
logger.exception("In test {0} for node {1}".format(test, node))
@@ -124,7 +125,7 @@
os.makedirs(dr)
test = tool_type_mapper[name](params, res_q.put, dr,
- node=node.get_ip())
+ node=node.get_conn_id())
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
@@ -391,8 +392,6 @@
report.make_io_report(ctx.results, html_rep_fname, fuel_url, creds=creds)
- logger.info("Html report were stored in " + html_rep_fname)
-
text_rep_fname = cfg_dict['text_report_file']
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results:
@@ -438,6 +437,8 @@
parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
help="Don't connect/discover fuel nodes",
default=False)
+ parser.add_argument("-r", '--no-html-report', action='store_true',
+ help="Skip html report", default=False)
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -462,9 +463,11 @@
report_stages = [
console_report_stage,
- html_report_stage
]
+ if not opts.no_html_report:
+ report_stages.append(html_report_stage)
+
load_config(opts.config_file, opts.post_process_only)
if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index d157d98..9fd1a84 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -163,8 +163,8 @@
vals = [data[header].value - self.prev.get(header, 0)
for header in self.headers]
- self.prev.update({header: data[header].value
- for header in self.headers})
+ self.prev.update(dict((header, data[header].value)
+ for header in self.headers))
else:
vals = [data[header].value for header in self.headers]
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 18e8d3e..7c8cb98 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -56,7 +56,11 @@
if node.monitor_url is not None:
monitor_url = node.monitor_url
else:
- ext_ip = utils.get_ip_for_target(node.get_ip())
+ ip = node.get_ip()
+ if ip == '127.0.0.1':
+ ext_ip = '127.0.0.1'
+ else:
+ ext_ip = utils.get_ip_for_target(ip)
monitor_url = receiver_url.format(ip=ext_ip)
monitored_nodes.append(node)
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index f1818ad..f4de3b6 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,11 +1,12 @@
import re
import time
import socket
+import shutil
import logging
import os.path
import getpass
import threading
-
+import subprocess
import paramiko
@@ -13,13 +14,56 @@
logger = logging.getLogger("wally")
-def ssh_connect(creds, retry_count=6, timeout=10, log_warns=True):
+class Local(object):
+ "placeholder for local node"
+ @classmethod
+ def open_sftp(cls):
+ return cls
+
+ @classmethod
+ def mkdir(cls, remotepath, mode=None):
+ os.mkdir(remotepath)
+ if mode is not None:
+ os.chmod(remotepath, mode)
+
+ @classmethod
+ def put(cls, localfile, remfile):
+ shutil.copyfile(localfile, remfile)
+
+ @classmethod
+ def chmod(cls, path, mode):
+ os.chmod(path, mode)
+
+ @classmethod
+ def copytree(cls, src, dst):
+ shutil.copytree(src, dst)
+
+ @classmethod
+ def remove(cls, path):
+ os.unlink(path)
+
+ @classmethod
+ def close(cls):
+ pass
+
+ @classmethod
+ def open(cls, *args, **kwarhgs):
+ return open(*args, **kwarhgs)
+
+
+def ssh_connect(creds, conn_timeout=60):
+ if creds == 'local':
+ return Local
+
+ tcp_timeout = 30
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.known_hosts = None
- for i in range(retry_count):
+ etime = time.time() + conn_timeout
+
+ while True:
try:
if creds.user is None:
user = getpass.getuser()
@@ -28,7 +72,7 @@
if creds.passwd is not None:
ssh.connect(creds.host,
- timeout=timeout, # tcp connect timeout
+ timeout=tcp_timeout,
username=user,
password=creds.passwd,
port=creds.port,
@@ -39,7 +83,7 @@
if creds.key_file is not None:
ssh.connect(creds.host,
username=user,
- timeout=timeout, # tcp connect timeout
+ timeout=tcp_timeout,
key_filename=creds.key_file,
look_for_keys=False,
port=creds.port)
@@ -48,28 +92,16 @@
key_file = os.path.expanduser('~/.ssh/id_rsa')
ssh.connect(creds.host,
username=user,
- timeout=timeout, # tcp connect timeout
+ timeout=tcp_timeout,
key_filename=key_file,
look_for_keys=False,
port=creds.port)
return ssh
- # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
except paramiko.PasswordRequiredException:
raise
except socket.error:
- retry_left = retry_count - i - 1
-
- if retry_left > 0:
- if log_warns:
- msg = "Node {0.host}:{0.port} connection timeout."
-
- if 0 != retry_left:
- msg += " {0} retry left.".format(retry_left)
-
- logger.warning(msg.format(creds))
- else:
+ if time.time() > etime:
raise
-
time.sleep(1)
@@ -248,6 +280,9 @@
def connect(uri, **params):
+ if uri == 'local':
+ return Local
+
creds = parse_ssh_uri(uri)
creds.port = int(creds.port)
return ssh_connect(creds, **params)
@@ -260,6 +295,23 @@
def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
nolog=False, node=None):
"should be replaces by normal implementation, with select"
+
+ if conn is Local:
+ if not nolog:
+ logger.debug("SSH:local Exec {0!r}".format(cmd))
+ proc = subprocess.Popen(cmd, shell=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ stdoutdata, _ = proc.communicate(input=stdin_data)
+
+ if proc.returncode != 0:
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
+
+ return stdoutdata
+
transport = conn.get_transport()
session = transport.open_session()
diff --git a/wally/start_vms.py b/wally/start_vms.py
index de1f312..e3b9245 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -80,7 +80,7 @@
'OS_AUTH_URL': auth_url
}
- params_s = " ".join("{}={}".format(k, v) for k, v in params.items())
+ params_s = " ".join("{0}={1}".format(k, v) for k, v in params.items())
spath = os.path.dirname(wally.__file__)
spath = os.path.dirname(spath)
@@ -389,9 +389,9 @@
# wait till vm actually deleted
- logger.warning("Volume deletion commented out")
- # for vol in volumes_to_delete:
- # logger.debug("Deleting volume " + vol.display_name)
- # cinder.volumes.delete(vol)
+ # logger.warning("Volume deletion commented out")
+ for vol in volumes_to_delete:
+ logger.debug("Deleting volume " + vol.display_name)
+ cinder.volumes.delete(vol)
logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index d15d18e..b11f20e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -1,7 +1,7 @@
import sys
import time
import json
-import random
+import copy
import select
import pprint
import argparse
@@ -15,14 +15,242 @@
SETTING = 1
+class FioJobSection(object):
+ def __init__(self, name):
+ self.name = name
+ self.vals = OrderedDict()
+ self.format_params = {}
+
+ def copy(self):
+ return copy.deepcopy(self)
+
+
+def to_bytes(sz):
+ sz = sz.lower()
+ try:
+ return int(sz)
+ except ValueError:
+ if sz[-1] == 'm':
+ return (1024 ** 2) * int(sz[:-1])
+ if sz[-1] == 'k':
+ return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
+ raise
+
+
+def fio_config_lexer(fio_cfg):
+ for lineno, line in enumerate(fio_cfg.split("\n")):
+ try:
+ line = line.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if line.startswith('['):
+ assert line.endswith(']'), "name should ends with ]"
+ yield lineno, SECTION, line[1:-1], None
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield lineno, SETTING, opt_name.strip(), opt_val.strip()
+ else:
+ yield lineno, SETTING, line, None
+ except Exception as exc:
+ pref = "During parsing line number {0}\n".format(lineno)
+ raise ValueError(pref + exc.message)
+
+
+def fio_config_parse(lexer_iter, format_params):
+ orig_format_params_keys = set(format_params)
+ format_params = format_params.copy()
+ in_defaults = False
+ curr_section = None
+ defaults = OrderedDict()
+
+ for lineno, tp, name, val in lexer_iter:
+ if tp == SECTION:
+ if curr_section is not None:
+ yield curr_section
+
+ if name == 'defaults':
+ in_defaults = True
+ curr_section = None
+ else:
+ in_defaults = False
+ curr_section = FioJobSection(name)
+ curr_section.format_params = format_params.copy()
+ curr_section.vals = defaults.copy()
+ else:
+ assert tp == SETTING
+ if name == name.upper():
+ msg = "Param not in default section in line " + str(lineno)
+ assert in_defaults, msg
+ if name not in orig_format_params_keys:
+ # don't make parse_value for PARAMS
+ # they would be parsed later
+ # or this would breakes arrays
+ format_params[name] = val
+ elif in_defaults:
+ defaults[name] = parse_value(val)
+ else:
+ msg = "data outside section, line " + str(lineno)
+ assert curr_section is not None, msg
+ curr_section.vals[name] = parse_value(val)
+
+ if curr_section is not None:
+ yield curr_section
+
+
+def parse_value(val):
+ if val is None:
+ return None
+
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ try:
+ return float(val)
+ except ValueError:
+ pass
+
+ if val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2]
+ vals = list(i.strip() for i in content.split(','))
+ return map(parse_value, vals)
+ return val
+
+
+def process_repeats(sec_iter):
+
+ for sec in sec_iter:
+ if '*' in sec.name:
+ msg = "Only one '*' allowed in section name"
+ assert sec.name.count('*') == 1, msg
+
+ name, count = sec.name.split("*")
+ sec.name = name.strip()
+ count = count.strip()
+
+ try:
+ count = int(count.strip().format(**sec.format_params))
+ except KeyError:
+ raise ValueError("No parameter {0} given".format(count[1:-1]))
+ except ValueError:
+ msg = "Parameter {0} nas non-int value {1!r}"
+ raise ValueError(msg.format(count[1:-1],
+ count.format(**sec.format_params)))
+
+ yield sec
+
+ if 'ramp_time' in sec.vals:
+ sec = sec.copy()
+ sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+ for _ in range(count - 1):
+ yield sec.copy()
+ else:
+ yield sec
+
+
+def process_cycles(sec_iter):
+ # insert parametrized cycles
+ sec_iter = try_format_params_into_section(sec_iter)
+
+ for sec in sec_iter:
+
+ cycles_var_names = []
+ cycles_var_values = []
+
+ for name, val in sec.vals.items():
+ if isinstance(val, list):
+ cycles_var_names.append(name)
+ cycles_var_values.append(val)
+
+ if len(cycles_var_names) == 0:
+ yield sec
+ else:
+ for combination in itertools.product(*cycles_var_values):
+ new_sec = sec.copy()
+ new_sec.vals.update(zip(cycles_var_names, combination))
+ yield new_sec
+
+
+def try_format_params_into_section(sec_iter):
+ for sec in sec_iter:
+ params = sec.format_params
+ for name, val in sec.vals.items():
+ if isinstance(val, basestring):
+ try:
+ sec.vals[name] = parse_value(val.format(**params))
+ except:
+ pass
+
+ yield sec
+
+
+def format_params_into_section_finall(sec_iter, counter=[0]):
+ group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
+ for sec in sec_iter:
+
+ num_jobs = int(sec.vals.get('numjobs', '1'))
+ if num_jobs != 1:
+ assert 'group_reporting' in sec.vals, group_report_err_msg
+
+ params = sec.format_params
+
+ fsize = to_bytes(sec.vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ for name, val in sec.vals.items():
+ if isinstance(val, basestring):
+ sec.vals[name] = parse_value(val.format(**params))
+ else:
+ assert isinstance(val, (int, float)) or val is None
+
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(sec.vals)
+ sec.name = sec.name.format(**params)
+
+ yield sec
+
+
+def fio_config_to_str(sec_iter):
+ res = ""
+
+ for pos, sec in enumerate(sec_iter):
+ if pos != 0:
+ res += "\n"
+
+ res += "[{0}]\n".format(sec.name)
+
+ for name, val in sec.vals.items():
+ if name.startswith('_'):
+ continue
+
+ if val is None:
+ res += name + "\n"
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+
def get_test_sync_mode(config):
try:
return config['sync_mode']
except KeyError:
pass
- is_sync = config.get("sync", "0") == "1"
- is_direct = config.get("direct", "0") == "1"
+ is_sync = str(config.get("sync", "0")) == "1"
+ is_direct = str(config.get("direct", "0")) == "1"
if is_sync and is_direct:
return 'x'
@@ -42,9 +270,9 @@
sync_mode = get_test_sync_mode(params)
th_count = params.get('numjobs')
+
if th_count is None:
- th_count = params.get('concurence', '1')
- th_count = int(th_count)
+ th_count = params.get('concurence', 1)
return "{0}{1}{2}th{3}".format(rw,
sync_mode,
@@ -52,343 +280,78 @@
th_count)
-counter = [0]
-
-
-def extract_iterables(vals):
- iterable_names = []
- iterable_values = []
- rest = {}
-
- for val_name, val in vals.items():
- if val is None or not val.startswith('{%'):
- rest[val_name] = val
- else:
- assert val.endswith("%}")
- content = val[2:-2]
- iterable_names.append(val_name)
- iterable_values.append(list(i.strip() for i in content.split(',')))
-
- return iterable_names, iterable_values, rest
-
-
-def format_params_into_section(sec, params, final=True):
- processed_vals = {}
-
- for val_name, val in sec.items():
- if val is None:
- processed_vals[val_name] = val
- else:
- try:
- processed_vals[val_name] = val.format(**params)
- except KeyError:
- if final:
- raise
- processed_vals[val_name] = val
-
- return processed_vals
-
-
-def process_section(name, vals, defaults, format_params):
- vals = vals.copy()
- params = format_params.copy()
-
- if '*' in name:
- name, repeat = name.split('*')
- name = name.strip()
- repeat = int(repeat.format(**params))
- else:
- repeat = 1
-
- # this code can be optimized
- iterable_names, iterable_values, processed_vals = extract_iterables(vals)
-
- group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
- if iterable_values == []:
- processed_vals = format_params_into_section(processed_vals, params,
- final=False)
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(processed_vals)
-
- num_jobs = int(processed_vals.get('numjobs', '1'))
- fsize = to_bytes(processed_vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- processed_vals = format_params_into_section(processed_vals, params,
- final=True)
-
- if num_jobs != 1:
- assert 'group_reporting' in processed_vals, group_report_err_msg
-
- ramp_time = processed_vals.get('ramp_time')
- for i in range(repeat):
- yield name.format(**params), processed_vals.copy()
-
- if 'ramp_time' in processed_vals:
- del processed_vals['ramp_time']
-
- if ramp_time is not None:
- processed_vals['ramp_time'] = ramp_time
- else:
- for it_vals in itertools.product(*iterable_values):
- processed_vals = format_params_into_section(processed_vals, params,
- final=False)
-
- processed_vals.update(dict(zip(iterable_names, it_vals)))
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(processed_vals)
-
- num_jobs = int(processed_vals.get('numjobs', '1'))
- fsize = to_bytes(processed_vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- processed_vals = format_params_into_section(processed_vals, params,
- final=True)
-
- if processed_vals.get('numjobs', '1') != '1':
- assert 'group_reporting' in processed_vals,\
- group_report_err_msg
-
- ramp_time = processed_vals.get('ramp_time')
-
- for i in range(repeat):
- yield name.format(**params), processed_vals.copy()
- if 'ramp_time' in processed_vals:
- processed_vals['_ramp_time'] = ramp_time
- processed_vals.pop('ramp_time')
-
- if ramp_time is not None:
- processed_vals['ramp_time'] = ramp_time
- processed_vals.pop('_ramp_time')
-
-
-def calculate_execution_time(combinations):
+def calculate_execution_time(sec_iter):
time = 0
- for _, params in combinations:
- time += int(params.get('ramp_time', 0))
- time += int(params.get('_ramp_time', 0))
- time += int(params.get('runtime', 0))
+ for sec in sec_iter:
+ time += sec.vals.get('ramp_time', 0)
+ time += sec.vals.get('runtime', 0)
return time
-def parse_fio_config_full(fio_cfg, params=None):
- defaults = {}
- format_params = {}
+def slice_config(sec_iter, runcycle=None, max_jobs=1000):
+ jcount = 0
+ runtime = 0
+ curr_slice = []
- if params is None:
- ext_params = {}
- else:
- ext_params = params.copy()
+ for pos, sec in enumerate(sec_iter):
- curr_section = None
- curr_section_name = None
+ jc = sec.vals.get('numjobs', 1)
+ msg = "numjobs should be integer, not {0!r}".format(jc)
+ assert isinstance(jc, int), msg
- for tp, name, val in parse_fio_config_iter(fio_cfg):
- if tp == SECTION:
- non_def = curr_section_name != 'defaults'
- if curr_section_name is not None and non_def:
- format_params.update(ext_params)
- for sec in process_section(curr_section_name,
- curr_section,
- defaults,
- format_params):
- yield sec
+ curr_task_time = calculate_execution_time([sec])
- if name == 'defaults':
- curr_section = defaults
- else:
- curr_section = OrderedDict()
- curr_section.update(defaults)
- curr_section_name = name
+ if jc > max_jobs:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(sec.name))
+ if runcycle is not None and len(curr_slice) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
else:
- assert tp == SETTING
- assert curr_section_name is not None, "no section name"
- if name == name.upper():
- assert curr_section_name == 'defaults'
- format_params[name] = val
- else:
- curr_section[name] = val
+ rc_ok = True
- if curr_section_name is not None and curr_section_name != 'defaults':
- format_params.update(ext_params)
- for sec in process_section(curr_section_name,
- curr_section,
- defaults,
- format_params):
- yield sec
-
-
-def parse_fio_config_iter(fio_cfg):
- for lineno, line in enumerate(fio_cfg.split("\n")):
- try:
- line = line.strip()
-
- if line.startswith("#") or line.startswith(";"):
- continue
-
- if line == "":
- continue
-
- if line.startswith('['):
- assert line.endswith(']'), "name should ends with ]"
- yield SECTION, line[1:-1], None
- elif '=' in line:
- opt_name, opt_val = line.split('=', 1)
- yield SETTING, opt_name.strip(), opt_val.strip()
- else:
- yield SETTING, line, None
- except Exception as exc:
- pref = "During parsing line number {0}\n".format(lineno)
- raise ValueError(pref + exc.message)
-
-
-def format_fio_config(fio_cfg):
- res = ""
- for pos, (name, section) in enumerate(fio_cfg):
- if name.startswith('_'):
+ if jc + jcount <= max_jobs and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ curr_slice.append(sec)
continue
- if pos != 0:
- res += "\n"
+ assert len(curr_slice) != 0
+ yield curr_slice
- res += "[{0}]\n".format(name)
- for opt_name, opt_val in section.items():
- if opt_val is None:
- res += opt_name + "\n"
- else:
- res += "{0}={1}\n".format(opt_name, opt_val)
- return res
+ if '_ramp_time' in sec.vals:
+ sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+ curr_task_time = calculate_execution_time([sec])
+
+ runtime = curr_task_time
+ jcount = jc
+ curr_slice = [sec]
+
+ if curr_slice != []:
+ yield curr_slice
-count = 0
+def parse_all_in_1(source, test_params):
+ lexer_it = fio_config_lexer(source)
+ sec_it = fio_config_parse(lexer_it, test_params)
+ sec_it = process_cycles(sec_it)
+ sec_it = process_repeats(sec_it)
+ return format_params_into_section_finall(sec_it)
-def to_bytes(sz):
- sz = sz.lower()
- try:
- return int(sz)
- except ValueError:
- if sz[-1] == 'm':
- return (1024 ** 2) * int(sz[:-1])
- if sz[-1] == 'k':
- return 1024 * int(sz[:-1])
- if sz[-1] == 'g':
- return (1024 ** 3) * int(sz[:-1])
- raise
+def parse_and_slice_all_in_1(source, test_params, **slice_params):
+ sec_it = parse_all_in_1(source, test_params)
+ return slice_config(sec_it, **slice_params)
-def do_run_fio_fake(bconf):
- def estimate_iops(sz, bw, lat):
- return 1 / (lat + float(sz) / bw)
- global count
- count += 1
- parsed_out = []
-
- BW = 120.0 * (1024 ** 2)
- LAT = 0.003
-
- for name, cfg in bconf:
- sz = to_bytes(cfg['blocksize'])
- curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
- curr_ulat = curr_lat * 1000000
- curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
- iops = estimate_iops(sz, curr_bw, curr_lat)
- bw = iops * sz
-
- res = {'ctx': 10683,
- 'error': 0,
- 'groupid': 0,
- 'jobname': name,
- 'majf': 0,
- 'minf': 30,
- 'read': {'bw': 0,
- 'bw_agg': 0.0,
- 'bw_dev': 0.0,
- 'bw_max': 0,
- 'bw_mean': 0.0,
- 'bw_min': 0,
- 'clat': {'max': 0,
- 'mean': 0.0,
- 'min': 0,
- 'stddev': 0.0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- },
- 'sys_cpu': 0.64,
- 'trim': {'bw': 0,
- 'bw_agg': 0.0,
- 'bw_dev': 0.0,
- 'bw_max': 0,
- 'bw_mean': 0.0,
- 'bw_min': 0,
- 'clat': {'max': 0,
- 'mean': 0.0,
- 'min': 0,
- 'stddev': 0.0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- },
- 'usr_cpu': 0.23,
- 'write': {'bw': 0,
- 'bw_agg': 0,
- 'bw_dev': 0,
- 'bw_max': 0,
- 'bw_mean': 0,
- 'bw_min': 0,
- 'clat': {'max': 0, 'mean': 0,
- 'min': 0, 'stddev': 0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0,
- 'min': 0, 'stddev': 0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- }
- }
-
- if cfg['rw'] in ('read', 'randread'):
- key = 'read'
- elif cfg['rw'] in ('write', 'randwrite'):
- key = 'write'
- else:
- raise ValueError("Uknown op type {0}".format(key))
-
- res[key]['bw'] = bw
- res[key]['iops'] = iops
- res[key]['runtime'] = 30
- res[key]['io_bytes'] = res[key]['runtime'] * bw
- res[key]['bw_agg'] = bw
- res[key]['bw_dev'] = bw / 30
- res[key]['bw_max'] = bw * 1.5
- res[key]['bw_min'] = bw / 1.5
- res[key]['bw_mean'] = bw
- res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
- 'min': curr_ulat / 2, 'stddev': curr_ulat}
- res[key]['lat'] = res[key]['clat'].copy()
- res[key]['slat'] = res[key]['clat'].copy()
-
- parsed_out.append(res)
-
- return zip(parsed_out, bconf)
+def compile_all_in_1(source, test_params, **slice_params):
+ slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
+ for slices in slices_it:
+ yield fio_config_to_str(slices)
-def do_run_fio(bconf):
- benchmark_config = format_fio_config(bconf)
+def do_run_fio(config_slice):
+ benchmark_config = fio_config_to_str(config_slice)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
p = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
@@ -398,6 +361,9 @@
# set timeout
raw_out, raw_err = p.communicate(benchmark_config)
+ # HACK
+ raw_out = "{" + raw_out.split('{', 1)[1]
+
if 0 != p.returncode:
msg = "Fio failed with code: {0}\nOutput={1}"
raise OSError(msg.format(p.returncode, raw_err))
@@ -414,91 +380,37 @@
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out, exc.message))
- return zip(parsed_out, bconf)
-
-# limited by fio
-MAX_JOBS = 1000
+ return zip(parsed_out, config_slice)
-def next_test_portion(whole_conf, runcycle, cluster=False):
- if cluster:
- for name, sec in whole_conf:
- if '_ramp_time' in sec:
- sec['ramp_time'] = sec.pop('_ramp_time')
- yield [(name, sec)]
- return
-
- jcount = 0
- runtime = 0
- bconf = []
-
- for pos, (name, sec) in enumerate(whole_conf):
- jc = int(sec.get('numjobs', '1'))
-
- if runcycle is not None:
- curr_task_time = calculate_execution_time([(name, sec)])
- else:
- curr_task_time = 0
-
- if jc > MAX_JOBS:
- err_templ = "Can't process job {0!r} - too large numjobs"
- raise ValueError(err_templ.format(name))
-
- if runcycle is not None and len(bconf) != 0:
- rc_ok = curr_task_time + runtime <= runcycle
- else:
- rc_ok = True
-
- if jc + jcount <= MAX_JOBS and rc_ok:
- runtime += curr_task_time
- jcount += jc
- bconf.append((name, sec))
- if '_ramp_time' in sec:
- del sec['_ramp_time']
- continue
-
- assert len(bconf) != 0
- yield bconf
-
- if '_ramp_time' in sec:
- sec['ramp_time'] = sec.pop('_ramp_time')
- curr_task_time = calculate_execution_time([(name, sec)])
-
- runtime = curr_task_time
- jcount = jc
- bconf = [(name, sec)]
-
- if bconf != []:
- yield bconf
-
-
-def add_job_results(jname, job_output, jconfig, res):
+def add_job_results(section, job_output, res):
if job_output['write']['iops'] != 0:
raw_result = job_output['write']
else:
raw_result = job_output['read']
- if jname not in res:
+ vals = section.vals
+ if section.name not in res:
j_res = {}
- j_res["rw"] = jconfig["rw"]
- j_res["sync_mode"] = get_test_sync_mode(jconfig)
- j_res["concurence"] = int(jconfig.get("numjobs", 1))
- j_res["blocksize"] = jconfig["blocksize"]
+ j_res["rw"] = vals["rw"]
+ j_res["sync_mode"] = get_test_sync_mode(vals)
+ j_res["concurence"] = int(vals.get("numjobs", 1))
+ j_res["blocksize"] = vals["blocksize"]
j_res["jobname"] = job_output["jobname"]
- j_res["timings"] = [int(jconfig.get("runtime", 0)),
- int(jconfig.get("ramp_time", 0))]
+ j_res["timings"] = [int(vals.get("runtime", 0)),
+ int(vals.get("ramp_time", 0))]
else:
- j_res = res[jname]
- assert j_res["rw"] == jconfig["rw"]
- assert j_res["rw"] == jconfig["rw"]
- assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
- assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
- assert j_res["blocksize"] == jconfig["blocksize"]
+ j_res = res[section.name]
+ assert j_res["rw"] == vals["rw"]
+ assert j_res["rw"] == vals["rw"]
+ assert j_res["sync_mode"] == get_test_sync_mode(vals)
+ assert j_res["concurence"] == int(vals.get("numjobs", 1))
+ assert j_res["blocksize"] == vals["blocksize"]
assert j_res["jobname"] == job_output["jobname"]
# ramp part is skipped for all tests, except first
- # assert j_res["timings"] == (jconfig.get("runtime"),
- # jconfig.get("ramp_time"))
+ # assert j_res["timings"] == (vals.get("runtime"),
+ # vals.get("ramp_time"))
def j_app(name, x):
j_res.setdefault(name, []).append(x)
@@ -509,65 +421,46 @@
j_app("clat", raw_result["clat"]["mean"])
j_app("slat", raw_result["slat"]["mean"])
- res[jname] = j_res
+ res[section.name] = j_res
-def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
- whole_conf = list(parse_fio_config_full(benchmark_config, params))
- whole_conf = whole_conf[skip:]
- res = ""
-
- for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
- res += format_fio_config(bconf)
- res += "\n#" + "-" * 50 + "\n\n"
-
- return res
-
-
-def run_fio(benchmark_config,
- params,
- runcycle=None,
- raw_results_func=None,
- skip_tests=0,
- fake_fio=False,
- cluster=False):
-
- whole_conf = list(parse_fio_config_full(benchmark_config, params))
- whole_conf = whole_conf[skip_tests:]
- res = {}
- curr_test_num = skip_tests
- executed_tests = 0
+def run_fio(sliced_it, raw_results_func=None):
+ sliced_list = list(sliced_it)
ok = True
+
try:
- for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
+ curr_test_num = 0
+ executed_tests = 0
+ result = {}
- if fake_fio:
- res_cfg_it = do_run_fio_fake(bconf)
- else:
- res_cfg_it = do_run_fio(bconf)
-
+ for i, test_slice in enumerate(sliced_list):
+ res_cfg_it = do_run_fio(test_slice)
res_cfg_it = enumerate(res_cfg_it, curr_test_num)
- for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+ for curr_test_num, (job_output, section) in res_cfg_it:
executed_tests += 1
+
if raw_results_func is not None:
raw_results_func(executed_tests,
- [job_output, jname, jconfig])
+ [job_output, section])
- assert jname == job_output["jobname"], \
- "{0} != {1}".format(jname, job_output["jobname"])
+ msg = "{0} != {1}".format(section.name, job_output["jobname"])
+ assert section.name == job_output["jobname"], msg
- if jname.startswith('_'):
+ if section.name.startswith('_'):
continue
- add_job_results(jname, job_output, jconfig, res)
+ add_job_results(section, job_output, result)
+
curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
- exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
- print msg_template.format(curr_test_num - skip_tests,
- len(whole_conf),
- sec_to_str(exec_time))
+ rest = sliced_list[i:]
+ time_eta = sum(map(calculate_execution_time, rest))
+ test_left = sum(map(len, rest))
+ print msg_template.format(curr_test_num,
+ test_left,
+ sec_to_str(time_eta))
except (SystemExit, KeyboardInterrupt):
raise
@@ -578,7 +471,7 @@
print "======== END OF ERROR ========="
ok = False
- return res, executed_tests, ok
+ return result, executed_tests, ok
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -606,11 +499,6 @@
job_cfg += char
-def estimate_cfg(job_cfg, params, skip_tests=0):
- bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
- return calculate_execution_time(bconf)
-
-
def sec_to_str(seconds):
h = seconds // 3600
m = (seconds % 3600) // 60
@@ -641,12 +529,6 @@
help="Max cycle length in seconds")
parser.add_argument("--show-raw-results", action='store_true',
default=False, help="Output raw input and results")
- parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
- help="Skip NUM tests")
- parser.add_argument("--faked-fio", action='store_true',
- default=False, help="Emulate fio with 0 test time")
- parser.add_argument("--cluster", action='store_true',
- default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
@@ -674,22 +556,25 @@
name, val = param_val.split("=", 1)
params[name] = val
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
+
+ sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
+
if argv_obj.estimate:
- print sec_to_str(estimate_cfg(job_cfg, params))
+ it = map(calculate_execution_time, sliced_it)
+ print sec_to_str(sum(it))
return 0
if argv_obj.num_tests or argv_obj.compile:
if argv_obj.compile:
- data = compile(job_cfg, params, argv_obj.skip_tests,
- cluster=argv_obj.cluster)
- out_fd.write(data)
- out_fd.write("\n")
+ for test_slice in sliced_it:
+ out_fd.write(fio_config_to_str(test_slice))
+ out_fd.write("\n#" + "-" * 70 + "\n\n")
if argv_obj.num_tests:
- bconf = list(parse_fio_config_full(job_cfg, params,
- argv_obj.cluster))
- bconf = bconf[argv_obj.skip_tests:]
- print len(bconf)
+ print len(list(sliced_it))
return 0
@@ -708,14 +593,7 @@
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
- job_res, num_tests, ok = run_benchmark(argv_obj.type,
- job_cfg,
- params,
- argv_obj.runcycle,
- rrfunc,
- argv_obj.skip_tests,
- argv_obj.faked_fio,
- cluster=argv_obj.cluster)
+ job_res, num_tests, ok = run_benchmark(argv_obj.type, sliced_it, rrfunc)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
new file mode 100644
index 0000000..cc8e411
--- /dev/null
+++ b/wally/suits/io/ceph.cfg
@@ -0,0 +1,61 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+size=5G
+ramp_time=30
+runtime=60
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+numjobs=1
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# this is essentially sequential read openration
+# we can't use seq read with numjobs > 1 on clouds due to caching
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=16m
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# sequential write
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw=write
+direct=1
+numjobs={NUMJOBS}
diff --git a/wally/suits/io/io_scenario_check_distribution.cfg b/wally/suits/io/check_distribution.cfg
similarity index 100%
rename from wally/suits/io/io_scenario_check_distribution.cfg
rename to wally/suits/io/check_distribution.cfg
diff --git a/wally/suits/io/io_scenario_check_linearity.cfg b/wally/suits/io/check_linearity.cfg
similarity index 100%
rename from wally/suits/io/io_scenario_check_linearity.cfg
rename to wally/suits/io/check_linearity.cfg
diff --git a/wally/suits/io/io_scenario_check_th_count.cfg b/wally/suits/io/check_th_count.cfg
similarity index 100%
rename from wally/suits/io/io_scenario_check_th_count.cfg
rename to wally/suits/io/check_th_count.cfg
diff --git a/wally/suits/io/io_scenario_check_warmup.cfg b/wally/suits/io/check_warmup.cfg
similarity index 100%
rename from wally/suits/io/io_scenario_check_warmup.cfg
rename to wally/suits/io/check_warmup.cfg
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/hdd.cfg
similarity index 80%
rename from wally/suits/io/io_scenario_hdd.cfg
rename to wally/suits/io/hdd.cfg
index 46191f2..519fb0f 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -7,8 +7,9 @@
softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
+NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-size=10Gb
+size=10G
ramp_time=5
runtime=30
@@ -19,7 +20,7 @@
blocksize=4k
rw=randwrite
sync=1
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+numjobs={NUMJOBS}
# ---------------------------------------------------------------------
# check different thread count, direct read mode. (latency, iops) = func(th_count)
@@ -29,18 +30,16 @@
blocksize=4k
rw=randread
direct=1
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+numjobs={NUMJOBS}
# ---------------------------------------------------------------------
-# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# also check BW for seq read/write.
+# No reason for th count > 1 in case of sequantial operations
+# They became random
# ---------------------------------------------------------------------
[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=1m
rw={% read, write %}
direct=1
-offset_increment=1073741824 # 1G
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
# check IOPS randwrite.
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
deleted file mode 100644
index 5e793f2..0000000
--- a/wally/suits/io/io_scenario_ceph.cfg
+++ /dev/null
@@ -1,62 +0,0 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-
-size=5G
-ramp_time=20
-runtime=20
-
-# ---------------------------------------------------------------------
-# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# also check BW for seq read/write.
-# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=1m
-rw=read
-direct=1
-offset_increment={PER_TH_OFFSET}
-numjobs={% 20, 120 %}
-
-# # ---------------------------------------------------------------------
-# # check different thread count, sync mode. (latency, iops) = func(th_count)
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randwrite
-# sync=1
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check different thread count, direct read mode. (latency, iops) = func(th_count)
-# # also check iops for randread
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randread
-# direct=1
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# # also check BW for seq read/write.
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=1m
-# rw={% read, write %}
-# direct=1
-# offset_increment=1073741824 # 1G
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check IOPS randwrite.
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randwrite
-# direct=1
diff --git a/wally/suits/io/io_scenario_long_test.cfg b/wally/suits/io/long_test.cfg
similarity index 100%
rename from wally/suits/io/io_scenario_long_test.cfg
rename to wally/suits/io/long_test.cfg
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index 25721eb..9005450 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -1,10 +1,32 @@
import re
import json
+import collections
from wally.utils import ssize_to_b
from wally.statistic import med_dev
+PerfInfo = collections.namedtuple('PerfInfo',
+ ('name',
+ 'bw', 'iops', 'dev',
+ 'lat', 'lat_dev', 'raw'))
+
+
+def process_disk_info(test_output):
+ data = {}
+
+ for tp, pre_result in test_output:
+ if tp != 'io':
+ pass
+
+ for name, results in pre_result['res'].items():
+ bw, bw_dev = med_dev(results['bw'])
+ iops, iops_dev = med_dev(results['iops'])
+ lat, lat_dev = med_dev(results['lat'])
+ dev = bw_dev / float(bw)
+ data[name] = PerfInfo(name, bw, iops, dev, lat, lat_dev, results)
+ return data
+
def parse_output(out_err):
start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
@@ -37,6 +59,21 @@
return closure
+def filter_processed_data(name_prefix, fields_to_select, **filters):
+ def closure(data):
+ for name, result in data.items():
+ if name_prefix is not None:
+ if not name.startswith(name_prefix):
+ continue
+
+ for k, v in filters.items():
+ if result.raw.get(k) != v:
+ break
+ else:
+ yield map(result.raw.get, fields_to_select)
+ return closure
+
+
def load_data(raw_data):
data = list(parse_output(raw_data))[0]
diff --git a/wally/suits/io/io_scenario_check_vm_count_ec2.cfg b/wally/suits/io/vm_count_ec2.cfg
similarity index 100%
rename from wally/suits/io/io_scenario_check_vm_count_ec2.cfg
rename to wally/suits/io/vm_count_ec2.cfg
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index c5615bb..0f8afd4 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,6 +2,7 @@
import time
import os.path
import logging
+import datetime
from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
@@ -101,16 +102,17 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
- self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
- self.config_params))
+ self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
+ self.config_params))
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
fio_command_file = open_for_append_or_create(cmd_log)
- fio_command_file.write(io_agent.compile(self.raw_cfg,
- self.config_params,
- None))
+
+ cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
+ splitter = "\n\n" + "-" * 60 + "\n\n"
+ fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def cleanup(self, conn):
@@ -140,13 +142,14 @@
if self.options.get('prefill_files', True):
files = {}
- for secname, params in self.configs:
- sz = ssize_to_b(params['size'])
+ for section in self.configs:
+ sz = ssize_to_b(section.vals['size'])
msz = sz / (1024 ** 2)
+
if sz % (1024 ** 2) != 0:
msz += 1
- fname = params['filename']
+ fname = section.vals['filename']
# if already has other test with the same file name
# take largest size
@@ -173,9 +176,7 @@
logger.warning("Test files prefill disabled")
def run(self, conn, barrier):
- # logger.warning("No tests runned")
- # return
- cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
+ cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
# cmd_templ = "env python2 {0} --type {1} {2} --json -"
params = " ".join("{0}={1}".format(k, v)
@@ -184,25 +185,25 @@
if "" != params:
params = "--params " + params
- if self.options.get('cluster', False):
- logger.info("Cluster mode is used")
- cluster_opt = "--cluster"
- else:
- logger.info("Non-cluster mode is used")
- cluster_opt = ""
-
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
- cluster_opt)
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
logger.debug("Waiting on barrier")
- exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
+ exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try:
timeout = int(exec_time * 1.2 + 300)
if barrier.wait():
- templ = "Test should takes about {0}. Will wait at most {1}"
- logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
+ templ = "Test should takes about {0}." + \
+ " Should finish at {1}," + \
+ " will wait at most till {2}"
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ wait_till = now_dt + datetime.timedelta(0, timeout)
+
+ logger.info(templ.format(exec_time_str,
+ end_dt.strftime("%H:%M:%S"),
+ wait_till.strftime("%H:%M:%S")))
out_err = run_over_ssh(conn, cmd,
stdin_data=self.raw_cfg,