large commit. new code, with sensors, line count dropped, etc
diff --git a/config.yaml b/config.yaml
index 3c27c90..b365042 100644
--- a/config.yaml
+++ b/config.yaml
@@ -12,7 +12,8 @@
# service: all
# auth: cirros:cubswin:)
- ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+ # ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+ ceph: local
# fuel:
# connection:
@@ -20,31 +21,39 @@
# creds: admin:admin@admin
# discover: controller
-start:
- count: x1
- img_name: TestVM
- flavor_name: m1.micro
- keypair_name: test
- network_zone_name: net04
- flt_ip_pool: net04_ext
- key_file: path/to/
- user: username
+explicit_nodes:
+ "ssh://192.168.152.43": testnode
+
+# start:
+# count: x1
+# img_name: TestVM
+# flavor_name: m1.micro
+# keypair_name: test
+# network_zone_name: net04
+# flt_ip_pool: net04_ext
+# key_file: path/to/
+# user: username
# sensors to be installed, accordingli to role
sensors:
- ceph-osd: ceph-io, ceph-cpu, ceph-ram, ceph-net
- ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
- os-compute: io, net
- test-vm: system-cpu
+ receiver_uri: udp://192.168.0.108:5699
+ roles_mapping:
+ ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
+ # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
+ os-compute: io, net
+ test-vm: system-cpu
# tests to run
tests:
- pgbench:
- opts:
- num_clients: [4, 8, 12]
- transactions: [1, 2, 3]
+ # pgbench:
+ # opts:
+ # num_clients: [4, 8, 12]
+ # transactions: [1, 2, 3]
+ io:
+ tool: fio
+ config_file: io_task.cfg
# where to store results
results:
diff --git a/fake_run_test.py b/fake_run_test.py
index 7797cfc..8dc772c 100644
--- a/fake_run_test.py
+++ b/fake_run_test.py
@@ -91,7 +91,6 @@
stdout=None,
stderr=None,
stdin=None):
- print "Running subprocess command: %s" % cmd
self.stdin, self.stdout, self.stderr = get_fake_out(cmd)
def wait(self):
diff --git a/io_scenario/fio b/io_scenario/fio
deleted file mode 100755
index 4686a1f..0000000
--- a/io_scenario/fio
+++ /dev/null
Binary files differ
diff --git a/io_scenario/io.py b/io_scenario/io.py
deleted file mode 100644
index d698546..0000000
--- a/io_scenario/io.py
+++ /dev/null
@@ -1,617 +0,0 @@
-import re
-import os
-import sys
-import stat
-import time
-import json
-import Queue
-import os.path
-import argparse
-import warnings
-import subprocess
-
-
-class BenchmarkOption(object):
- def __init__(self, concurence, iodepth, action, blocksize, size):
- self.iodepth = iodepth
- self.action = action
- self.blocksize = blocksize
- self.concurence = concurence
- self.size = size
- self.direct_io = False
- self.use_hight_io_priority = True
- self.sync = False
-
-
-def which(program):
- def is_exe(fpath):
- return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
- fpath, fname = os.path.split(program)
- if fpath:
- if is_exe(program):
- return program
- else:
- for path in os.environ["PATH"].split(os.pathsep):
- path = path.strip('"')
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
-
- return None
-
-
-# ------------------------------ IOZONE SUPPORT ------------------------------
-
-
-class IOZoneParser(object):
- "class to parse iozone results"
-
- start_tests = re.compile(r"^\s+KB\s+reclen\s+")
- resuts = re.compile(r"[\s0-9]+")
- mt_iozone_re = re.compile(r"\s+Children see throughput " +
- r"for\s+\d+\s+(?P<cmd>.*?)\s+=\s+" +
- r"(?P<perf>[\d.]+)\s+KB/sec")
-
- cmap = {'initial writers': 'write',
- 'rewriters': 'rewrite',
- 'initial readers': 'read',
- 're-readers': 'reread',
- 'random readers': 'random read',
- 'random writers': 'random write',
- 'readers': 'read'}
-
- string1 = " " + \
- " random random " + \
- "bkwd record stride "
-
- string2 = "KB reclen write rewrite " + \
- "read reread read write " + \
- "read rewrite read fwrite frewrite fread freread"
-
- @classmethod
- def apply_parts(cls, parts, string, sep=' \t\n'):
- add_offset = 0
- for part in parts:
- _, start, stop = part
- start += add_offset
- add_offset = 0
-
- # condition splited to make pylint happy
- while stop + add_offset < len(string):
-
- # condition splited to make pylint happy
- if not (string[stop + add_offset] not in sep):
- break
-
- add_offset += 1
-
- yield part, string[start:stop + add_offset]
-
- @classmethod
- def make_positions(cls):
- items = [i for i in cls.string2.split() if i]
-
- pos = 0
- cls.positions = []
-
- for item in items:
- npos = cls.string2.index(item, 0 if pos == 0 else pos + 1)
- cls.positions.append([item, pos, npos + len(item)])
- pos = npos + len(item)
-
- for itm, val in cls.apply_parts(cls.positions, cls.string1):
- if val.strip():
- itm[0] = val.strip() + " " + itm[0]
-
- @classmethod
- def parse_iozone_res(cls, res, mthreads=False):
- parsed_res = None
-
- sres = res.split('\n')
-
- if not mthreads:
- for pos, line in enumerate(sres[1:]):
- if line.strip() == cls.string2 and \
- sres[pos].strip() == cls.string1.strip():
- add_pos = line.index(cls.string2)
- parsed_res = {}
-
- npos = [(name, start + add_pos, stop + add_pos)
- for name, start, stop in cls.positions]
-
- for itm, res in cls.apply_parts(npos, sres[pos + 2]):
- if res.strip() != '':
- parsed_res[itm[0]] = int(res.strip())
-
- del parsed_res['KB']
- del parsed_res['reclen']
- else:
- parsed_res = {}
- for line in sres:
- rr = cls.mt_iozone_re.match(line)
- if rr is not None:
- cmd = rr.group('cmd')
- key = cls.cmap.get(cmd, cmd)
- perf = int(float(rr.group('perf')))
- parsed_res[key] = perf
- return parsed_res
-
-
-IOZoneParser.make_positions()
-
-
-def iozone_do_prepare(params, filename, pattern_base):
- all_files = []
- threads = int(params.concurence)
- if 1 != threads:
- filename = filename + "_{0}"
- all_files.extend(filename .format(i) for i in range(threads))
- else:
- all_files.append(filename)
-
- bsz = 1024 if params.size > 1024 else params.size
- if params.size % bsz != 0:
- fsz = (params.size // bsz + 1) * bsz
- else:
- fsz = params.size
-
- for fname in all_files:
- with open(fname, "wb") as fd:
- if fsz > 1024:
- pattern = pattern_base * 1024 * 1024
- for _ in range(int(fsz / 1024) + 1):
- fd.write(pattern)
- else:
- fd.write(pattern_base * 1024 * fsz)
- fd.flush()
- return all_files
-
-
-VERIFY_PATTERN = "\x6d"
-
-
-def prepare_iozone(params, filename):
- return iozone_do_prepare(params, filename, VERIFY_PATTERN)
-
-
-def do_run_iozone(params, timeout, all_files,
- iozone_path='iozone',
- microsecond_mode=False,
- prepare_only=False):
-
- cmd = [iozone_path, "-V", str(ord(VERIFY_PATTERN))]
-
- if params.sync:
- cmd.append('-o')
-
- if params.direct_io:
- cmd.append('-I')
-
- if microsecond_mode:
- cmd.append('-N')
-
- threads = int(params.concurence)
- if 1 != threads:
- cmd.extend(('-t', str(threads), '-F'))
- cmd.extend(all_files)
- else:
- cmd.extend(('-f', all_files[0]))
-
- cmd.append('-i')
-
- if params.action == 'write':
- cmd.append("0")
- elif params.action == 'read':
- cmd.append("1")
- elif params.action == 'randwrite' or params.action == 'randread':
- cmd.append("2")
- else:
- raise ValueError("Unknown action {0!r}".format(params.action))
-
- cmd.extend(('-s', str(params.size)))
- cmd.extend(('-r', str(params.blocksize)))
-
- # no retest
- cmd.append('-+n')
-
- raw_res = subprocess.check_output(cmd)
-
- try:
- parsed_res = IOZoneParser.parse_iozone_res(raw_res, threads > 1)
-
- res = {}
-
- if params.action == 'write':
- res['bw_mean'] = parsed_res['write']
- elif params.action == 'randwrite':
- res['bw_mean'] = parsed_res['random write']
- elif params.action == 'read':
- res['bw_mean'] = parsed_res['read']
- elif params.action == 'randread':
- res['bw_mean'] = parsed_res['random read']
- except:
- raise
-
- return res, " ".join(cmd)
-
-
-def run_iozone(benchmark, binary_path, all_files,
- timeout=None):
-
- if timeout is not None:
- benchmark.size = benchmark.blocksize * 50
- res_time = do_run_iozone(benchmark, timeout, all_files,
- iozone_path=binary_path,
- microsecond_mode=True)[0]
-
- size = (benchmark.blocksize * timeout * 1000000)
- size /= res_time["bw_mean"]
- size = (size // benchmark.blocksize + 1) * benchmark.blocksize
- benchmark.size = size
-
- return do_run_iozone(benchmark, timeout, all_files,
- iozone_path=binary_path)
-
-
-def install_iozone_package():
- if which('iozone'):
- return
-
- is_redhat = os.path.exists('/etc/centos-release')
- is_redhat = is_redhat or os.path.exists('/etc/fedora-release')
- is_redhat = is_redhat or os.path.exists('/etc/redhat-release')
-
- if is_redhat:
- subprocess.check_output(["yum", "install", 'iozone3'])
- return
-
- try:
- os_release_cont = open('/etc/os-release').read()
-
- is_ubuntu = "Ubuntu" in os_release_cont
-
- if is_ubuntu or "Debian GNU/Linux" in os_release_cont:
- subprocess.check_output(["apt-get", "install", "iozone3"])
- return
- except (IOError, OSError):
- pass
-
- raise RuntimeError("Unknown host OS.")
-
-
-def install_iozone_static(iozone_url, dst):
- if not os.path.isfile(dst):
- import urllib
- urllib.urlretrieve(iozone_url, dst)
-
- st = os.stat(dst)
- os.chmod(dst, st.st_mode | stat.S_IEXEC)
-
-
-def locate_iozone():
- binary_path = which('iozone')
-
- if binary_path is None:
- binary_path = which('iozone3')
-
- if binary_path is None:
- sys.stderr.write("Can't found neither iozone not iozone3 binary"
- "Provide --binary-path or --binary-url option")
- return None
-
- return binary_path
-
-# ------------------------------ FIO SUPPORT ---------------------------------
-
-
-def run_fio_once(benchmark, fio_path, tmpname, timeout=None):
- if benchmark.size is None:
- raise ValueError("You need to specify file size for fio")
-
- cmd_line = [fio_path,
- "--name=%s" % benchmark.action,
- "--rw=%s" % benchmark.action,
- "--blocksize=%sk" % benchmark.blocksize,
- "--iodepth=%d" % benchmark.iodepth,
- "--filename=%s" % tmpname,
- "--size={0}k".format(benchmark.size),
- "--numjobs={0}".format(benchmark.concurence),
- "--output-format=json",
- "--sync=" + ('1' if benchmark.sync else '0')]
-
- if timeout is not None:
- cmd_line.append("--timeout=%d" % timeout)
- cmd_line.append("--runtime=%d" % timeout)
-
- if benchmark.direct_io:
- cmd_line.append("--direct=1")
-
- if benchmark.use_hight_io_priority:
- cmd_line.append("--prio=0")
-
- raw_out = subprocess.check_output(cmd_line)
- return json.loads(raw_out)["jobs"][0], " ".join(cmd_line)
-
-
-def run_fio(benchmark, binary_path, all_files, timeout=None):
- job_output, cmd_line = run_fio_once(benchmark, binary_path,
- all_files[0], timeout)
-
- if benchmark.action in ('write', 'randwrite'):
- raw_result = job_output['write']
- else:
- raw_result = job_output['read']
-
- res = {}
-
- # 'bw_dev bw_mean bw_max bw_min'.split()
- for field in ["bw_mean", "iops"]:
- res[field] = raw_result[field]
- res["lat"] = raw_result["lat"]["mean"]
- res["clat"] = raw_result["clat"]["mean"]
- res["slat"] = raw_result["slat"]["mean"]
-
- return res, cmd_line
-
-
-def locate_fio():
- return which('fio')
-
-
-# ----------------------------------------------------------------------------
-
-
-def locate_binary(binary_tp, binary_path):
- if binary_path is not None:
- if os.path.isfile(binary_path):
- if not os.access(binary_path, os.X_OK):
- st = os.stat(binary_path)
- os.chmod(binary_path, st.st_mode | stat.S_IEXEC)
- else:
- binary_path = None
-
- if binary_path is not None:
- return binary_path
-
- if 'iozone' == binary_tp:
- return locate_iozone()
- else:
- return locate_fio()
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
- if 'iozone' == binary_tp:
- return run_iozone(*argv, **kwargs)
- else:
- return run_fio(*argv, **kwargs)
-
-
-def prepare_benchmark(binary_tp, *argv, **kwargs):
- if 'iozone' == binary_tp:
- return {'all_files': prepare_iozone(*argv, **kwargs)}
- else:
- return {}
-
-
-def type_size(string):
- try:
- return re.match("\d+[KGBM]?", string, re.I).group(0)
- except:
- msg = "{0!r} don't looks like size-description string".format(string)
- raise ValueError(msg)
-
-
-def type_size_ext(string):
- if string.startswith("x"):
- int(string[1:])
- return string
-
- if string.startswith("r"):
- int(string[1:])
- return string
-
- try:
- return re.match("\d+[KGBM]?", string, re.I).group(0)
- except:
- msg = "{0!r} don't looks like size-description string".format(string)
- raise ValueError(msg)
-
-
-def ssize_to_kb(ssize):
- try:
- smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
- for ext, coef in smap.items():
- if ssize.endswith(ext):
- return int(ssize[:-1]) * coef
-
- if int(ssize) % 1024 != 0:
- raise ValueError()
-
- return int(ssize) / 1024
-
- except (ValueError, TypeError, AttributeError):
- tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
- raise ValueError(tmpl.format(ssize))
-
-
-def get_ram_size():
- try:
- with open("/proc/meminfo") as fd:
- for ln in fd:
- if "MemTotal:" in ln:
- sz, kb = ln.split(':')[1].strip().split(" ")
- assert kb == 'kB'
- return int(sz)
- except (ValueError, TypeError, AssertionError):
- raise
- # return None
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run 'iozone' or 'fio' and return result")
- parser.add_argument(
- "--type", metavar="BINARY_TYPE",
- choices=['iozone', 'fio'], required=True)
- parser.add_argument(
- "--iodepth", metavar="IODEPTH", type=int,
- help="I/O requests queue depths", required=True)
- parser.add_argument(
- '-a', "--action", metavar="ACTION", type=str,
- help="actions to run", required=True,
- choices=["read", "write", "randread", "randwrite"])
- parser.add_argument(
- "--blocksize", metavar="BLOCKSIZE", type=type_size,
- help="single operation block size", required=True)
- parser.add_argument(
- "--timeout", metavar="TIMEOUT", type=int,
- help="runtime of a single run", default=None)
- parser.add_argument(
- "--iosize", metavar="SIZE", type=type_size_ext,
- help="file size", default=None)
- parser.add_argument(
- "-s", "--sync", default=False, action="store_true",
- help="exec sync after each write")
- parser.add_argument(
- "-d", "--direct-io", default=False, action="store_true",
- help="use O_DIRECT", dest='directio')
- parser.add_argument(
- "-t", "--sync-time", default=None, type=int, metavar="UTC_TIME",
- help="sleep till sime utc time", dest='sync_time')
- parser.add_argument(
- "--test-file", help="file path to run test on",
- default=None, dest='test_file')
- parser.add_argument(
- "--binary-path", help="path to binary, used for testing",
- default=None, dest='binary_path')
- parser.add_argument(
- "--prepare-only", default=False, action="store_true")
- parser.add_argument("--concurrency", default=1, type=int)
-
- parser.add_argument("--preparation-results", default="{}")
-
- return parser.parse_args(argv)
-
-
-def sensor_thread(sensor_list, cmd_q, data_q):
- while True:
- try:
- cmd_q.get(timeout=0.5)
- data_q.put([])
- return
- except Queue.Empty:
- pass
-
-
-def clean_prep(preparation_results):
- if 'all_files' in preparation_results:
- for fname in preparation_results['all_files']:
- if os.path.isfile(fname):
- os.unlink(fname)
-
-
-def main(argv):
- if argv[0] == '--clean':
- clean_prep(json.loads(argv[1]))
- return 0
-
- argv_obj = parse_args(argv)
- argv_obj.blocksize = ssize_to_kb(argv_obj.blocksize)
-
- if argv_obj.iosize is not None:
- if argv_obj.iosize.startswith('x'):
- argv_obj.iosize = argv_obj.blocksize * int(argv_obj.iosize[1:])
- elif argv_obj.iosize.startswith('r'):
- rs = get_ram_size()
- if rs is None:
- sys.stderr.write("Can't determine ram size\n")
- exit(1)
- argv_obj.iosize = rs * int(argv_obj.iosize[1:])
- else:
- argv_obj.iosize = ssize_to_kb(argv_obj.iosize)
-
- benchmark = BenchmarkOption(argv_obj.concurrency,
- argv_obj.iodepth,
- argv_obj.action,
- argv_obj.blocksize,
- argv_obj.iosize)
-
- benchmark.direct_io = argv_obj.directio
-
- if argv_obj.sync:
- benchmark.sync = True
-
- if argv_obj.prepare_only:
- test_file_name = argv_obj.test_file
- if test_file_name is None:
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- test_file_name = os.tmpnam()
-
- fnames = prepare_benchmark(argv_obj.type,
- benchmark,
- test_file_name)
- print json.dumps(fnames)
- else:
- preparation_results = json.loads(argv_obj.preparation_results)
-
- if not isinstance(preparation_results, dict):
- raise ValueError("preparation_results should be a dict" +
- " with all-string keys")
-
- for key in preparation_results:
- if not isinstance(key, basestring):
- raise ValueError("preparation_results should be a dict" +
- " with all-string keys")
-
- if argv_obj.test_file and 'all_files' in preparation_results:
- raise ValueError("Either --test-file or --preparation-results" +
- " options should be provided, not both")
-
- if argv_obj.test_file is not None:
- preparation_results['all_files'] = [argv_obj.test_file]
-
- autoremove = False
- if 'all_files' not in preparation_results:
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- preparation_results['all_files'] = [os.tmpnam()]
- autoremove = True
-
- binary_path = locate_binary(argv_obj.type,
- argv_obj.binary_path)
-
- if binary_path is None:
- sys.stderr.write("Can't locate binary {0}\n".format(argv_obj.type))
- return 1
-
- try:
- if argv_obj.sync_time is not None:
- dt = argv_obj.sync_time - time.time()
- if dt > 0:
- time.sleep(dt)
-
- res, cmd = run_benchmark(argv_obj.type,
- benchmark=benchmark,
- binary_path=binary_path,
- timeout=argv_obj.timeout,
- **preparation_results)
-
- res['__meta__'] = benchmark.__dict__.copy()
- res['__meta__']['cmdline'] = cmd
-
- import pprint
- sys.stdout.write(pprint.pformat(res))
-
- # sys.stdout.write(json.dumps(res))
- if not argv_obj.prepare_only:
- sys.stdout.write("\n")
-
- finally:
- if autoremove:
- clean_prep(preparation_results)
-
-
-if __name__ == '__main__':
- exit(main(sys.argv[1:]))
diff --git a/io_scenario/iozone b/io_scenario/iozone
deleted file mode 100755
index 5edce95..0000000
--- a/io_scenario/iozone
+++ /dev/null
Binary files differ
diff --git a/io_task.cfg b/io_task.cfg
new file mode 100644
index 0000000..cc3d07d
--- /dev/null
+++ b/io_task.cfg
@@ -0,0 +1,10 @@
+[writetest]
+blocksize=4k
+filename=/tmp/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+ioengine=libaio
+iodepth=1
+size=1Gb
+runtime=5
diff --git a/itest.py b/itest.py
index 2e782ee..daebd1a 100644
--- a/itest.py
+++ b/itest.py
@@ -1,12 +1,13 @@
import abc
import json
-import types
import os.path
import logging
+from StringIO import StringIO
+from ConfigParser import RawConfigParser
-from io_scenario import io
+from tests import io
from ssh_utils import copy_paths
-from utils import run_over_ssh
+from utils import run_over_ssh, ssize_to_b
logger = logging.getLogger("io-perf-tool")
@@ -14,19 +15,13 @@
class IPerfTest(object):
def __init__(self, on_result_cb):
- self.set_result_cb(on_result_cb)
-
- def set_result_cb(self, on_result_cb):
self.on_result_cb = on_result_cb
- def build(self, conn):
- self.pre_run(conn)
-
def pre_run(self, conn):
pass
@abc.abstractmethod
- def run(self, conn):
+ def run(self, conn, barrier):
pass
@@ -57,15 +52,15 @@
def pre_run(self, conn):
remote_script = self.copy_script(conn, self.pre_run_script)
cmd = remote_script
- code, out, err = run_over_ssh(conn, cmd)
+ code, out_err = run_over_ssh(conn, cmd)
if code != 0:
- raise Exception("Pre run failed. %s" % err)
+ raise Exception("Pre run failed. %s" % out_err)
- def run(self, conn):
+ def run(self, conn, barrier):
remote_script = self.copy_script(conn, self.run_script)
cmd = remote_script + ' ' + ' '.join(self.opts)
- code, out, err = run_over_ssh(conn, cmd)
- self.on_result(code, out, err, cmd)
+ code, out_err = run_over_ssh(conn, cmd)
+ self.on_result(code, out_err, cmd)
def parse_results(self, out):
for line in out.split("\n"):
@@ -73,16 +68,16 @@
if key and value:
self.on_result_cb((key, float(value)))
- def on_result(self, code, out, err, cmd):
+ def on_result(self, code, out_err, cmd):
if 0 == code:
try:
- self.parse_results(out)
- except Exception as err:
+ self.parse_results(out_err)
+ except Exception as exc:
msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(err.message))
+ raise RuntimeError(msg_templ.format(exc.message))
else:
templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
- logger.error(templ.format(cmd, code, err))
+ logger.error(templ.format(cmd, code, out_err))
class PgBenchTest(TwoScriptTest):
@@ -94,68 +89,89 @@
self.run_script = "hl_tests/postgres/run.sh"
-def run_test_iter(obj, conn):
- logger.debug("Run preparation")
- yield obj.pre_run(conn)
- logger.debug("Run test")
- res = obj.run(conn)
- if isinstance(res, types.GeneratorType):
- for vl in res:
- yield vl
- else:
- yield res
-
-
class IOPerfTest(IPerfTest):
+ io_py_remote = "/tmp/io.py"
+
def __init__(self,
- script_opts,
- testtool_local,
- on_result_cb,
- keep_tmp_files):
-
+ test_options,
+ on_result_cb):
IPerfTest.__init__(self, on_result_cb)
+ self.options = test_options
+ self.config_fname = test_options['config_file']
+ self.tool = test_options['tool']
+ self.configs = []
- dst_testtool_path = '/tmp/io_tool'
- self.script_opts = script_opts + ["--binary-path", dst_testtool_path]
- io_py_local = os.path.join(os.path.dirname(io.__file__), "io.py")
- self.io_py_remote = "/tmp/io.py"
+ cp = RawConfigParser()
+ cp.readfp(open(self.config_fname))
- self.files_to_copy = {testtool_local: dst_testtool_path,
- io_py_local: self.io_py_remote}
+ for secname in cp.sections():
+ params = dict(cp.items(secname))
+ self.configs.append((secname, params))
def pre_run(self, conn):
+ local_fname = io.__file__.rsplit('.')[0] + ".py"
+ self.files_to_copy = {local_fname: self.io_py_remote}
copy_paths(conn, self.files_to_copy)
- args = ['env', 'python2', self.io_py_remote] + \
- self.script_opts + ['--prepare-only']
+ cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ for secname, params in self.configs:
+ sz = ssize_to_b(params['size'])
+ msz = msz = sz / (1024 ** 2)
+ if sz % (1024 ** 2) != 0:
+ msz += 1
- code, self.prep_results, err = run_over_ssh(conn, " ".join(args))
+ cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
+ code, out_err = run_over_ssh(conn, cmd)
+
if code != 0:
- raise RuntimeError("Preparation failed " + err)
+ raise RuntimeError("Preparation failed " + out_err)
- def run(self, conn):
- args = ['env', 'python2', self.io_py_remote] + self.script_opts
- args.append('--preparation-results')
- args.append("'{0}'".format(self.prep_results))
- cmd = " ".join(args)
- code, out, err = run_over_ssh(conn, cmd)
- self.on_result(code, out, err, cmd)
- args = ['env', 'python2', self.io_py_remote, '--clean',
- "'{0}'".format(self.prep_results)]
- logger.debug(" ".join(args))
- code, _, err = run_over_ssh(conn, " ".join(args))
- if 0 != code:
- logger.error("Cleaning failed: " + err)
+ def run(self, conn, barrier):
+ cmd_templ = "env python2 {0} --type {1} --json -"
+ cmd = cmd_templ.format(self.io_py_remote, self.tool)
+ try:
+ for secname, _params in self.configs:
+ params = _params.copy()
+ count = params.pop('count', 1)
- def on_result(self, code, out, err, cmd):
+ config = RawConfigParser()
+ config.add_section(secname)
+
+ for k, v in params.items():
+ config.set(secname, k, v)
+
+ cfg = StringIO()
+ config.write(cfg)
+
+ # FIX python config parser-fio incompatibility
+ # remove spaces around '='
+ new_cfg = []
+ config_data = cfg.getvalue()
+ for line in config_data.split("\n"):
+ if '=' in line:
+ name, val = line.split('=', 1)
+ name = name.strip()
+ val = val.strip()
+ line = "{0}={1}".format(name, val)
+ new_cfg.append(line)
+
+ for _ in range(count):
+ barrier.wait()
+ code, out_err = run_over_ssh(conn, cmd,
+ stdin_data="\n".join(new_cfg))
+ self.on_result(code, out_err, cmd)
+ finally:
+ barrier.exit()
+
+ def on_result(self, code, out_err, cmd):
if 0 == code:
try:
- for line in out.split("\n"):
+ for line in out_err.split("\n"):
if line.strip() != "":
self.on_result_cb(json.loads(line))
- except Exception as err:
+ except Exception as exc:
msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(err.message))
+ raise RuntimeError(msg_templ.format(exc.message))
else:
- templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
- logger.error(templ.format(cmd, code, err))
+ templ = "Command {0!r} failed with code {1}. Output is:\n{2}"
+ logger.error(templ.format(cmd, code, out_err))
diff --git a/keystone.py b/keystone.py
index e6308b5..3ae5e7a 100644
--- a/keystone.py
+++ b/keystone.py
@@ -75,7 +75,7 @@
self.keystone.authenticate()
self.headers['X-Auth-Token'] = self.keystone.auth_token
except exceptions.AuthorizationFailure:
- print 'asdfsf'
+ raise
def do(self, method, path, params=None):
"""Do request. If gets 401 refresh token"""
diff --git a/nodes/ceph.py b/nodes/ceph.py
index 38ce177..793c021 100644
--- a/nodes/ceph.py
+++ b/nodes/ceph.py
@@ -1,6 +1,7 @@
""" Collect data about ceph nodes"""
import json
import logging
+import subprocess
from node import Node
@@ -10,14 +11,29 @@
logger = logging.getLogger("io-perf-tool")
-def discover_ceph_node(ip):
+def local_execute(cmd):
+ return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
+
+
+def ssh_execute(ssh):
+ def closure(cmd):
+ _, chan, _ = ssh.exec_command(cmd)
+ return chan.read()
+ return closure
+
+
+def discover_ceph_nodes(ip):
""" Return list of ceph's nodes ips """
ips = {}
- ssh = connect(ip)
- osd_ips = get_osds_ips(ssh, get_osds_list(ssh))
- mon_ips = get_mons_or_mds_ips(ssh, "mon")
- mds_ips = get_mons_or_mds_ips(ssh, "mds")
+ if ip != 'local':
+ executor = ssh_execute(connect(ip))
+ else:
+ executor = local_execute
+
+ osd_ips = get_osds_ips(executor, get_osds_list(executor))
+ mon_ips = get_mons_or_mds_ips(executor, "mon")
+ mds_ips = get_mons_or_mds_ips(executor, "mds")
for ip in osd_ips:
url = "ssh://%s" % ip
@@ -31,29 +47,24 @@
url = "ssh://%s" % ip
ips.setdefault(url, []).append("ceph-mds")
- return [Node(ip=url, roles=list(roles)) for url, roles in ips.items()]
+ return [Node(url, list(roles)) for url, roles in ips.items()]
-def get_osds_list(ssh):
+def get_osds_list(executor):
""" Get list of osds id"""
- _, chan, _ = ssh.exec_command("ceph osd ls")
- return filter(None, chan.read().split("\n"))
+ return filter(None, executor("ceph osd ls").split("\n"))
-def get_mons_or_mds_ips(ssh, who):
+def get_mons_or_mds_ips(executor, who):
""" Return mon ip list
:param who - "mon" or "mds" """
- if who == "mon":
- _, chan, _ = ssh.exec_command("ceph mon dump")
- elif who == "mds":
- _, chan, _ = ssh.exec_command("ceph mds dump")
- else:
+ if who not in ("mon", "mds"):
raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
"of mon/mds") % who)
- line_res = chan.read().split("\n")
- ips = set()
+ line_res = executor("ceph {0} dump".format(who)).split("\n")
+ ips = set()
for line in line_res:
fields = line.split()
@@ -67,12 +78,12 @@
return ips
-def get_osds_ips(ssh, osd_list):
+def get_osds_ips(executor, osd_list):
""" Get osd's ips
:param osd_list - list of osd names from osd ls command"""
ips = set()
for osd_id in osd_list:
- _, chan, _ = ssh.exec_command("ceph osd find {0}".format(osd_id))
- ip = json.loads(chan.read())["ip"]
- ips.add(ip.split(":")[0])
+ out = executor("ceph osd find {0}".format(osd_id))
+ ip = json.loads(out)["ip"]
+ ips.add(str(ip.split(":")[0]))
return ips
diff --git a/nodes/discover.py b/nodes/discover.py
index ec98890..b95d306 100644
--- a/nodes/discover.py
+++ b/nodes/discover.py
@@ -50,5 +50,6 @@
nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
if cluster == "ceph":
- nodes_to_run.extend(ceph.discover_ceph_node(cluster_info["ip"]))
+ nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+
return nodes_to_run
diff --git a/nodes/node.py b/nodes/node.py
index d7cae40..138d123 100644
--- a/nodes/node.py
+++ b/nodes/node.py
@@ -1,41 +1,16 @@
class Node(object):
- def __init__(self, ip, roles, username=None,
- password=None, key_path=None, port=None):
+ def __init__(self, conn_url, roles):
self.roles = roles
- self.ip = ip
- self.username = username
- self.password = password
- self.port = port
- self.key_path = key_path
+ self.conn_url = conn_url
self.connection = None
def __str__(self):
- return "<Node: url={0!r} roles={1} >".format(self.ip,
- ", ".join(self.roles))
+ templ = "<Node: url={conn_url!r} roles={roles}" + \
+ " connected={is_connected}>"
+ return templ.format(conn_url=self.conn_url,
+ roles=", ".join(self.roles),
+ is_connected=self.connection is not None)
def __repr__(self):
return str(self)
-
- def set_conn_attr(self, name, value):
- setattr(self, name, value)
-
- @property
- def connection_url(self):
- connection = []
-
- if self.username:
- connection.append(self.username)
- if self.password:
- connection.extend([":", self.password, "@"])
- connection.append("@")
-
- connection.append(self.ip)
- if self.port:
- connection.extend([":", self.port])
- if self.key_path:
- connection.extend([":", self.key_path])
- else:
- if self.key_path:
- connection.extend([":", ":", self.key_path])
- return "".join(connection)
diff --git a/report.py b/report.py
index 4d23b60..aaaaa53 100644
--- a/report.py
+++ b/report.py
@@ -2,7 +2,7 @@
from collections import OrderedDict
from chart import charts
-from utils import ssize_to_kb
+from utils import ssize_to_b
OPERATIONS = (('async', ('randwrite asynchronous', 'randread asynchronous',
@@ -87,7 +87,7 @@
OD = OrderedDict
ordered_build_results = OD(sorted(build_results.items(),
- key=lambda t: ssize_to_kb(t[0])))
+ key=lambda t: ssize_to_b(t[0])))
if not scale_x:
scale_x = ordered_build_results.keys()
diff --git a/run_test.py b/run_test.py
index 25da0ab..d1a9f4f 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,120 +1,36 @@
-import os
import sys
-import json
-import time
+import Queue
import pprint
import logging
-import os.path
import argparse
+import threading
+import collections
+from concurrent.futures import ThreadPoolExecutor
+
+import utils
import ssh_utils
-import io_scenario
from nodes import discover
+from nodes.node import Node
from config import cfg_dict
-from utils import log_error
-from rest_api import add_test
-from formatters import get_formatter
from itest import IOPerfTest, PgBenchTest
+from sensors.api import start_monitoring
+
+
logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
-ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
-logger.addHandler(ch)
-
-log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
-formatter = logging.Formatter(log_format,
- "%H:%M:%S")
-ch.setFormatter(formatter)
-tool_type_mapper = {
- "iozone": IOPerfTest,
- "fio": IOPerfTest,
- "pgbench": PgBenchTest,
-}
+def setup_logger(logger, level=logging.DEBUG):
+ logger.setLevel(level)
+ ch = logging.StreamHandler()
+ ch.setLevel(level)
+ logger.addHandler(ch)
-
-def run_io_test(tool,
- script_args,
- test_runner,
- keep_temp_files=False):
-
- files_dir = os.path.dirname(io_scenario.__file__)
-
- path = 'iozone' if 'iozone' == tool else 'fio'
- src_testtool_path = os.path.join(files_dir, path)
-
- obj_cls = tool_type_mapper[tool]
- obj = obj_cls(script_args,
- src_testtool_path,
- None,
- keep_temp_files)
-
- return test_runner(obj)
-
-
-def conn_func(obj, barrier, latest_start_time, conn):
- try:
- test_iter = itest.run_test_iter(obj, conn)
- next(test_iter)
-
- wait_on_barrier(barrier, latest_start_time)
-
- with log_error("!Run test"):
- return next(test_iter)
- except:
- print traceback.format_exc()
- raise
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run disk io performance test")
-
- parser.add_argument("-l", dest='extra_logs',
- action='store_true', default=False,
- help="print some extra log info")
-
- parser.add_argument('stages', nargs="+",
- choices=["discover", "connect", "start_new_nodes",
- "deploy_sensors"])
-
- # THIS ALL MOVE TO CONFIG FILE
- # parser.add_argument("-o", "--test-opts", dest='opts',
- # help="cmd line options for test")
-
- # parser.add_argument("-f", "--test-opts-file", dest='opts_file',
- # type=argparse.FileType('r'), default=None,
- # help="file with cmd line options for test")
-
- # parser.add_argument("--max-preparation-time", default=300,
- # type=int, dest="max_preparation_time")
-
- # parser.add_argument("-b", "--build-info", default=None,
- # dest="build_name")
-
- # parser.add_argument("-d", "--data-server-url", default=None,
- # dest="data_server_url")
-
- # parser.add_argument("-n", "--lab-name", default=None,
- # dest="lab_name")
-
- # parser.add_argument("--create-vms-opts", default=None,
- # help="Creating vm's before run ssh runner",
- # dest="create_vms_opts")
-
- # parser.add_argument("-k", "--keep", default=False,
- # help="keep temporary files",
- # dest="keep_temp_files", action='store_true')
-
- # parser.add_argument("--runner", required=True,
- # choices=["local", "ssh"], help="runner type")
-
- # parser.add_argument("--runner-extra-opts", default=None,
- # dest="runner_opts", help="runner extra options")
-
- return parser.parse_args(argv[1:])
+ log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+ formatter = logging.Formatter(log_format,
+ "%H:%M:%S")
+ ch.setFormatter(formatter)
def format_result(res, formatter):
@@ -127,59 +43,167 @@
def connect_one(node):
try:
- node.connection = ssh_utils.connect(node.connection_url)
+ ssh_pref = "ssh://"
+ if node.conn_url.startswith(ssh_pref):
+ url = node.conn_url[len(ssh_pref):]
+ node.connection = ssh_utils.connect(url)
+ else:
+ raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
- logger.exception()
+ logger.exception("During connect to {0}".format(node))
def connect_all(nodes):
+ logger.info("Connecting to nodes")
+ with ThreadPoolExecutor(32) as pool:
+ list(pool.map(connect_one, nodes))
+
+
+def save_sensors_data(q):
+ logger.info("Start receiving sensors data")
+ while True:
+ val = q.get()
+ if val is None:
+ break
+ # logger.debug("Sensors -> {0!r}".format(val))
+ logger.info("Sensors thread exits")
+
+
+def test_thread(test, node, barrier):
+ try:
+ logger.debug("Run preparation for {0}".format(node.conn_url))
+ test.pre_run(node.connection)
+ logger.debug("Run test for {0}".format(node.conn_url))
+ test.run(node.connection, barrier)
+ except:
+ logger.exception("In test {0} for node {1}".format(test, node))
+
+
+def run_tests(config, nodes):
+ tool_type_mapper = {
+ "io": IOPerfTest,
+ "pgbench": PgBenchTest,
+ }
+
+ test_nodes = [node for node in nodes
+ if 'testnode' in node.roles]
+
+ res_q = Queue.Queue()
+
+ for name, params in config['tests'].items():
+ logger.info("Starting {0} tests".format(name))
+
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier))
+ threads.append(th)
+ th.daemon = True
+ th.start()
+
+ for th in threads:
+ th.join()
+
+ while not res_q.empty():
+ logger.info("Get test result {0!r}".format(res_q.get()))
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run disk io performance test")
+
+ parser.add_argument("-l", dest='extra_logs',
+ action='store_true', default=False,
+ help="print some extra log info")
+
+ parser.add_argument('stages', nargs="+",
+ choices=["discover", "connect", "start_new_nodes",
+ "deploy_sensors", "run_tests"])
+
+ return parser.parse_args(argv[1:])
+
+
+def log_nodes_statistic(nodes):
+ logger.info("Found {0} nodes total".format(len(nodes)))
+ per_role = collections.defaultdict(lambda: 0)
+ for node in nodes:
+ for role in node.roles:
+ per_role[role] += 1
+
+ for role, count in sorted(per_role.items()):
+ logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def log_sensors_config(cfg):
pass
def main(argv):
- logging_conf = cfg_dict.get('logging')
- if logging_conf:
- if logging_conf.get('extra_logs'):
- logger.setLevel(logging.DEBUG)
- ch.setLevel(logging.DEBUG)
-
opts = parse_args(argv)
+
+ level = logging.DEBUG if opts.extra_logs else logging.WARNING
+ setup_logger(logger, level)
+
+ nodes = []
+
if 'discover' in opts.stages:
- current_data = discover.discover(cfg_dict.get('discover'))
+ logger.info("Start node discovery")
+ nodes = discover.discover(cfg_dict.get('discover'))
+
+ if 'explicit_nodes' in cfg_dict:
+ for url, roles in cfg_dict['explicit_nodes'].items():
+ nodes.append(Node(url, roles.split(",")))
+
+ log_nodes_statistic(nodes)
if 'connect' in opts.stages:
- for node in current_data:
- pass
+ connect_all(nodes)
- print "\n".join(map(str, current_data))
+ if 'deploy_sensors' in opts.stages:
+ logger.info("Deploing sensors")
+ cfg = cfg_dict.get('sensors')
+ sens_cfg = []
+
+ for role, sensors_str in cfg["roles_mapping"].items():
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+ for node in nodes:
+ if role in node.roles:
+ sens_cfg.append((node.connection, collect_cfg))
+
+ log_sensors_config(sens_cfg)
+
+ sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+ connected_config=sens_cfg)
+
+ with sensor_cm as sensors_control_queue:
+ th = threading.Thread(None, save_sensors_data, None,
+ (sensors_control_queue,))
+ th.daemon = True
+ th.start()
+
+ # TODO: wait till all nodes start to send sensors data
+
+ if 'run_tests' in opts.stages:
+ run_tests(cfg_dict, nodes)
+
+ sensors_control_queue.put(None)
+ th.join()
+ elif 'run_tests' in opts.stages:
+ run_tests(cfg_dict, nodes)
+
+ logger.info("Disconnecting")
+ for node in nodes:
+ node.connection.close()
+
return 0
- # tests = cfg_dict.get("tests", [])
-
- # Deploy and start sensors
- # deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
-
- # for test_name, opts in tests.items():
- # cmd_line = " ".join(opts['opts'])
- # logger.debug("Run test with {0!r} params".format(cmd_line))
- # latest_start_time = 300 + time.time()
- # uris = [node.connection_url for node in nodes_to_run]
- # runner = ssh_runner.get_ssh_runner(uris, conn_func,
- # latest_start_time,
- # opts.get('keep_temp_files'))
- # res = run_io_test(test_name,
- # opts['opts'],
- # runner,
- # opts.get('keep_temp_files'))
- # logger.debug(format_result(res, get_formatter(test_name)))
-
- # if cfg_dict.get('data_server_url'):
- # result = json.loads(get_formatter(opts.tool_type)(res))
- # result['name'] = opts.build_name
- # add_test(opts.build_name, result, opts.data_server_url)
-
- # return 0
-
if __name__ == '__main__':
exit(main(sys.argv))
diff --git a/sensors/api.py b/sensors/api.py
index f78e6a9..dc34af0 100644
--- a/sensors/api.py
+++ b/sensors/api.py
@@ -17,6 +17,7 @@
data_q.put(proto.recv(0.1))
except Timeout:
pass
+
try:
val = cmd_q.get(False)
@@ -28,8 +29,9 @@
@contextmanager
-def start_monitoring(uri, config):
- deploy_and_start_sensors(uri, config)
+def start_monitoring(uri, config=None, connected_config=None):
+ deploy_and_start_sensors(uri, config=config,
+ connected_config=connected_config)
try:
data_q = Queue.Queue()
cmd_q = Queue.Queue()
@@ -44,4 +46,5 @@
cmd_q.put(None)
th.join()
finally:
- stop_and_remove_sensors(config)
+ stop_and_remove_sensors(config,
+ connected_config=connected_config)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index b2dc3f1..e0428d9 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -1,33 +1,50 @@
import time
import json
import os.path
+import logging
from concurrent.futures import ThreadPoolExecutor, wait
from disk_perf_test_tool.ssh_utils import connect, copy_paths
+logger = logging.getLogger('io-perf-tool')
+
def wait_all_ok(futures):
return all(future.result() for future in futures)
-def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
+def deploy_and_start_sensors(monitor_uri, config,
+ remote_path='/tmp/sensors',
+ connected_config=None):
paths = {os.path.dirname(__file__): remote_path}
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
- for uri, config in config.items():
+ if connected_config is not None:
+ assert config is None
+ node_iter = connected_config
+ else:
+ node_iter = config.items()
+
+ for uri_or_conn, config in node_iter:
futures.append(executor.submit(deploy_and_start_sensor,
- paths, uri, monitor_uri,
+ paths, uri_or_conn,
+ monitor_uri,
config, remote_path))
if not wait_all_ok(futures):
raise RuntimeError("Sensor deployment fails on some nodes")
-def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
+def deploy_and_start_sensor(paths, uri_or_conn, monitor_uri, config,
+ remote_path):
try:
- conn = connect(uri)
+ if isinstance(uri_or_conn, basestring):
+ conn = connect(uri_or_conn)
+ else:
+ conn = uri_or_conn
+
copy_paths(conn, paths)
sftp = conn.open_sftp()
@@ -41,17 +58,23 @@
cmd = cmd_templ.format(main_remote_path,
monitor_uri,
config_remote_path)
- print "Executing", cmd
conn.exec_command(cmd)
sftp.close()
- conn.close()
+
+ if isinstance(uri_or_conn, basestring):
+ conn.close()
except:
+ logger.exception("During deploing sensors in {0}".format(uri_or_conn))
return False
return True
-def stop_and_remove_sensor(uri, remote_path):
- conn = connect(uri)
+def stop_and_remove_sensor(uri_or_conn, remote_path):
+ if isinstance(uri_or_conn, basestring):
+ conn = connect(uri_or_conn)
+ else:
+ conn = uri_or_conn
+
main_remote_path = os.path.join(remote_path, "main.py")
cmd_templ = "python {0} -d stop"
@@ -62,15 +85,23 @@
conn.exec_command("rm -rf {0}".format(remote_path))
- conn.close()
+ if isinstance(uri_or_conn, basestring):
+ conn.close()
-def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
+def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
+ connected_config=None):
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
- for uri, config in config.items():
+ if connected_config is not None:
+ assert config is None
+ conf_iter = connected_config
+ else:
+ conf_iter = config.items()
+
+ for uri_or_conn, config in conf_iter:
futures.append(executor.submit(stop_and_remove_sensor,
- uri, remote_path))
+ uri_or_conn, remote_path))
wait(futures)
diff --git a/sensors/main.py b/sensors/main.py
index fea46a3..3c953fa 100644
--- a/sensors/main.py
+++ b/sensors/main.py
@@ -64,6 +64,10 @@
time.sleep(opts.timeout)
+def pid_running(pid):
+ return os.path.exists("/proc/" + str(pid))
+
+
def main(argv):
opts = parse_args(argv)
@@ -86,12 +90,12 @@
elif opts.daemon == 'stop':
if os.path.isfile(pid_file):
pid = int(open(pid_file).read())
- if os.path.exists("/proc/" + str(pid)):
+ if pid_running(pid):
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
- if os.path.exists("/proc/" + str(pid)):
+ if pid_running(pid):
os.kill(pid, signal.SIGKILL)
if os.path.isfile(pid_file):
@@ -99,7 +103,7 @@
elif opts.daemon == 'status':
if os.path.isfile(pid_file):
pid = int(open(pid_file).read())
- if os.path.exists("/proc/" + str(pid)):
+ if pid_running(pid):
print "running"
return
print "stopped"
diff --git a/sensors/protocol.py b/sensors/protocol.py
index cfdd93e..02b661a 100644
--- a/sensors/protocol.py
+++ b/sensors/protocol.py
@@ -1,3 +1,4 @@
+import sys
import time
import socket
import select
@@ -57,6 +58,7 @@
# ------------------------------------- Transports ---------------------------
+
class ITransport(object):
def __init__(self, receiver):
pass
@@ -73,12 +75,14 @@
def __init__(self, receiver, delta=True):
if receiver:
- raise ValueError("StdoutTransport don't allows receiving")
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
self.headers = None
self.line_format = ""
self.prev = {}
self.delta = delta
+ self.fd = sys.stdout
def send(self, data):
if self.headers is None:
@@ -100,10 +104,17 @@
else:
vals = [data[header].value for header in self.headers]
- print self.line_format.format(*vals)
+ self.fd.write(self.line_format.format(*vals) + "\n")
def recv(self, timeout=None):
- raise ValueError("StdoutTransport don't allows receiving")
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+ def __init__(self, receiver, fname, delta=True):
+ StdoutTransport.__init__(self, receiver, delta)
+ self.fd = open(fname, "w")
class UDPTransport(ITransport):
@@ -170,10 +181,12 @@
ip, port = parsed_uri.netloc.split(":")
return UDPTransport(receiver, ip=ip, port=int(port),
packer_cls=PickleSerializer)
+ elif parsed_uri.scheme == 'file':
+ return FileTransport(receiver, parsed_uri.path)
elif parsed_uri.scheme == 'hugeudp':
ip, port = parsed_uri.netloc.split(":")
return HugeUDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=MSGPackSerializer)
+ packer_cls=MSGPackSerializer)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))
diff --git a/ssh_utils.py b/ssh_utils.py
index 7c859cf..e546b72 100644
--- a/ssh_utils.py
+++ b/ssh_utils.py
@@ -1,19 +1,14 @@
import re
import time
-import Queue
import logging
import os.path
import getpass
-import threading
import socket
import paramiko
-from concurrent.futures import ThreadPoolExecutor
-from utils import get_barrier
logger = logging.getLogger("io-perf-tool")
-conn_uri_attrs = ("user", "passwd", "host", "port", "path")
def ssh_connect(creds, retry_count=60, timeout=1):
@@ -67,7 +62,10 @@
return dirpath
-def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
+ALL_RWX_MODE = ((1 << 9) - 1)
+
+
+def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
remotepath = normalize_dirpath(remotepath)
if intermediate:
try:
@@ -83,7 +81,7 @@
def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
sftp.put(localfile, remfile)
if preserve_perm:
- sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
+ sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
@@ -121,9 +119,9 @@
sftp.chdir(remroot)
except IOError:
if preserve_perm:
- mode = os.stat(root).st_mode & 0777
+ mode = os.stat(root).st_mode & ALL_RWX_MODE
else:
- mode = 0777
+ mode = ALL_RWX_MODE
ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
sftp.chdir(remroot)
@@ -156,8 +154,10 @@
class ConnCreds(object):
+ conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
def __init__(self):
- for name in conn_uri_attrs:
+ for name in self.conn_uri_attrs:
setattr(self, name, None)
@@ -224,43 +224,42 @@
return ssh_connect(creds)
-def get_ssh_runner(uris,
- conn_func,
- latest_start_time=None,
- keep_temp_files=False):
- logger.debug("Connecting to servers")
+# def get_ssh_runner(uris,
+# conn_func,
+# latest_start_time=None,
+# keep_temp_files=False):
+# logger.debug("Connecting to servers")
- with ThreadPoolExecutor(max_workers=16) as executor:
- connections = list(executor.map(connect, uris))
+# with ThreadPoolExecutor(max_workers=16) as executor:
+# connections = list(executor.map(connect, uris))
- result_queue = Queue.Queue()
- barrier = get_barrier(len(uris), threaded=True)
+# result_queue = Queue.Queue()
+# barrier = get_barrier(len(uris), threaded=True)
- def closure(obj):
- ths = []
- obj.set_result_cb(result_queue.put)
+# def closure(obj):
+# ths = []
+# obj.set_result_cb(result_queue.put)
- params = (obj, barrier, latest_start_time)
+# params = (obj, barrier, latest_start_time)
- logger.debug("Start tests")
- for conn in connections:
- th = threading.Thread(None, conn_func, None,
- params + (conn,))
- th.daemon = True
- th.start()
- ths.append(th)
+# logger.debug("Start tests")
+# for conn in connections:
+# th = threading.Thread(None, conn_func, None,
+# params + (conn,))
+# th.daemon = True
+# th.start()
+# ths.append(th)
- for th in ths:
- th.join()
+# for th in ths:
+# th.join()
- test_result = []
- while not result_queue.empty():
- test_result.append(result_queue.get())
+# test_result = []
+# while not result_queue.empty():
+# test_result.append(result_queue.get())
- logger.debug("Done. Closing connection")
- for conn in connections:
- conn.close()
+# logger.debug("Done. Closing connection")
+# for conn in connections:
+# conn.close()
- return test_result
-
- return closure
+# return test_result
+# return closure
diff --git a/starts_vms.py b/starts_vms.py
index 62bff8c..16e7d08 100644
--- a/starts_vms.py
+++ b/starts_vms.py
@@ -199,7 +199,7 @@
if isinstance(vol.display_name, basestring):
if re.match(name_templ.format("\\d+"), vol.display_name):
if vol.status in ('available', 'error'):
- print "Deleting volume", vol.display_name
+ logger.debug("Deleting volume " + vol.display_name)
cinder.volumes.delete(vol)
logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/io_scenario/__init__.py b/tests/__init__.py
similarity index 100%
rename from io_scenario/__init__.py
rename to tests/__init__.py
diff --git a/tests/io.py b/tests/io.py
new file mode 100644
index 0000000..2405f53
--- /dev/null
+++ b/tests/io.py
@@ -0,0 +1,97 @@
+import sys
+import time
+import json
+import select
+import pprint
+import argparse
+import subprocess
+from StringIO import StringIO
+from ConfigParser import RawConfigParser
+
+
+def run_fio(benchmark_config):
+ cmd = ["fio", "--output-format=json", "-"]
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ raw_out, _ = p.communicate(benchmark_config)
+ job_output = json.loads(raw_out)["jobs"][0]
+
+ if job_output['write']['iops'] != 0:
+ raw_result = job_output['write']
+ else:
+ raw_result = job_output['read']
+
+ res = {}
+
+ # 'bw_dev bw_mean bw_max bw_min'.split()
+ for field in ["bw_mean", "iops"]:
+ res[field] = raw_result[field]
+
+ res["lat"] = raw_result["lat"]["mean"]
+ res["clat"] = raw_result["clat"]["mean"]
+ res["slat"] = raw_result["slat"]["mean"]
+ res["util"] = json.loads(raw_out)["disk_util"][0]
+
+ res["util"] = dict((str(k), v) for k, v in res["util"].items())
+
+ return res
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+ if 'fio' == binary_tp:
+ return run_fio(*argv, **kwargs)
+ raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run fio' and return result")
+ parser.add_argument(
+ "--type", metavar="BINARY_TYPE",
+ choices=['fio'], required=True)
+ parser.add_argument("--start-at", metavar="START_TIME", type=int)
+ parser.add_argument("--json", action="store_true", default=False)
+ parser.add_argument("jobfile")
+ return parser.parse_args(argv)
+
+
+def main(argv):
+ argv_obj = parse_args(argv)
+ if argv_obj.jobfile == '-':
+ job_cfg = ""
+ dtime = 10
+ while True:
+ r, w, x = select.select([sys.stdin], [], [], dtime)
+ if len(r) == 0:
+ raise IOError("No config provided")
+ char = sys.stdin.read(1)
+ if '' == char:
+ break
+ job_cfg += char
+ dtime = 1
+ else:
+ job_cfg = open(argv_obj.jobfile).read()
+
+ rcp = RawConfigParser()
+ rcp.readfp(StringIO(job_cfg))
+ assert len(rcp.sections()) == 1
+
+ if argv_obj.start_at is not None:
+ ctime = time.time()
+ if argv_obj.start_at >= ctime:
+ time.sleep(ctime - argv_obj.start_at)
+
+ res = run_benchmark(argv_obj.type, job_cfg)
+ res['__meta__'] = dict(rcp.items(rcp.sections()[0]))
+ res['__meta__']['raw'] = job_cfg
+
+ if argv_obj.json:
+ sys.stdout.write(json.dumps(res))
+ else:
+ sys.stdout.write(pprint.pformat(res))
+ sys.stdout.write("\n")
+ return 0
+
+if __name__ == '__main__':
+ exit(main(sys.argv[1:]))
diff --git a/io_scenario/__init__.py b/tests/postgres/__init__.py
similarity index 100%
copy from io_scenario/__init__.py
copy to tests/postgres/__init__.py
diff --git a/hl_tests/postgres/prepare.sh b/tests/postgres/prepare.sh
similarity index 100%
rename from hl_tests/postgres/prepare.sh
rename to tests/postgres/prepare.sh
diff --git a/hl_tests/postgres/run.sh b/tests/postgres/run.sh
similarity index 100%
rename from hl_tests/postgres/run.sh
rename to tests/postgres/run.sh
diff --git a/utils.py b/utils.py
index 059101a..4a784ec 100644
--- a/utils.py
+++ b/utils.py
@@ -1,10 +1,8 @@
import time
import socket
-import os.path
import logging
import threading
import contextlib
-import multiprocessing
logger = logging.getLogger("io-perf-tool")
@@ -22,43 +20,32 @@
return user, passwd, host
-def get_barrier(count, threaded=False):
- if threaded:
- class val(object):
- value = count
- cond = threading.Condition()
- else:
- val = multiprocessing.Value('i', count)
- cond = multiprocessing.Condition()
+class TaksFinished(Exception):
+ pass
- def closure(timeout):
- with cond:
- val.value -= 1
- if val.value == 0:
- cond.notify_all()
+
+class Barrier(object):
+ def __init__(self, count):
+ self.count = count
+ self.curr_count = 0
+ self.cond = threading.Condition()
+ self.exited = False
+
+ def wait(self, timeout=None):
+ with self.cond:
+ if self.exited:
+ raise TaksFinished()
+
+ self.curr_count += 1
+ if self.curr_count == self.count:
+ self.curr_count = 0
+ self.cond.notify_all()
else:
- cond.wait(timeout)
- return val.value == 0
+ self.cond.wait(timeout=timeout)
- return closure
-
-
-def wait_on_barrier(barrier, latest_start_time):
- if barrier is not None:
- if latest_start_time is not None:
- timeout = latest_start_time - time.time()
- else:
- timeout = None
-
- if timeout is not None and timeout > 0:
- msg = "Ready and waiting on barrier. " + \
- "Will wait at most {0} seconds"
- logger.debug(msg.format(int(timeout)))
-
- if not barrier(timeout):
- logger.debug("Barrier timeouted")
- else:
- logger.debug("Passing barrier, starting test")
+ def exit(self):
+ with self.cond:
+ self.exited = True
@contextlib.contextmanager
@@ -77,56 +64,51 @@
raise
-def run_over_ssh(conn, cmd):
+def run_over_ssh(conn, cmd, stdin_data=None, exec_timeout=60):
"should be replaces by normal implementation, with select"
-
- stdin, stdout, stderr = conn.exec_command(cmd)
- out = stdout.read()
- err = stderr.read()
- code = stdout.channel.recv_exit_status()
- return code, out, err
-
-
-def kb_to_ssize(ssize):
- size_ext = {
- 4: 'P',
- 3: 'T',
- 2: 'G',
- 1: 'M',
- 0: 'K'
- }
-
- for idx in reversed(sorted(size_ext)):
- if ssize > 1024 ** idx:
- ext = size_ext[idx]
- return "{0}{1}".format(int(ssize / 1024 ** idx), ext)
- raise ValueError("Can't convert {0} to kb".format(ssize))
-
-
-def ssize_to_kb(ssize):
+ transport = conn.get_transport()
+ session = transport.open_session()
try:
- smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
- for ext, coef in smap.items():
- if ssize.endswith(ext):
- return int(ssize[:-1]) * coef
+ session.set_combine_stderr(True)
- if int(ssize) % 1024 != 0:
- raise ValueError()
+ stime = time.time()
+ session.exec_command(cmd)
- return int(ssize) / 1024
+ if stdin_data is not None:
+ session.sendall(stdin_data)
- except (ValueError, TypeError, AttributeError):
- tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
- raise ValueError(tmpl.format(ssize))
+ session.settimeout(1)
+ session.shutdown_write()
+ output = ""
+
+ while True:
+ try:
+ ndata = session.recv(1024)
+ output += ndata
+ if "" == ndata:
+ break
+ except socket.timeout:
+ pass
+ if time.time() - stime > exec_timeout:
+ return 1, output + "\nExecution timeout"
+ code = session.recv_exit_status()
+ finally:
+ session.close()
+
+ return code, output
+
+
+SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
def ssize_to_b(ssize):
try:
- smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
- for ext, coef in smap.items():
- if ssize.endswith(ext):
- return int(ssize[:-1]) * coef * 1024
+ ssize = ssize.lower()
+ if ssize.endswith("b"):
+ ssize = ssize[:-1]
+ if ssize[-1] in SMAP:
+ return int(ssize[:-1]) * SMAP[ssize[-1]]
return int(ssize)
except (ValueError, TypeError, AttributeError):
tmpl = "Unknow size format {0!r} (or size not multiples 1024)"