fix MOL-177: Fix logging
diff --git a/io_scenario/io.py b/io_scenario/io.py
index 4f7cbcf..b9f3b6f 100644
--- a/io_scenario/io.py
+++ b/io_scenario/io.py
@@ -4,9 +4,11 @@
import stat
import time
import json
+import Queue
import os.path
import argparse
import warnings
+import threading
import subprocess
@@ -492,9 +494,23 @@
"--prepare-only", default=False, dest='prepare_only',
action="store_true")
parser.add_argument("--concurrency", default=1, type=int)
+
+ parser.add_argument("--with-sensors", default="",
+ dest="with_sensors")
+
return parser.parse_args(argv)
+def sensor_thread(sensor_list, cmd_q, data_q):
+ while True:
+ try:
+ cmd_q.get(timeout=0.5)
+ data_q.put([])
+ return
+ except Queue.Empty:
+ pass
+
+
def main(argv):
argv_obj = parse_args(argv)
argv_obj.blocksize = ssize_to_kb(argv_obj.blocksize)
@@ -542,17 +558,33 @@
if dt > 0:
time.sleep(dt)
+ if argv_obj.with_sensors != "":
+ oq = Queue.Queue()
+ iq = Queue.Queue()
+ argv = (argv_obj.with_sensors, oq, iq)
+ th = threading.Thread(None, sensor_thread, None, argv)
+ th.daemon = True
+ th.start()
+
res, cmd = run_benchmark(argv_obj.type,
benchmark,
binary_path,
test_file_name,
argv_obj.prepare_only,
argv_obj.timeout)
+ if argv_obj.with_sensors != "":
+ iq.put(None)
+ stats = oq.get()
+ else:
+ stats = None
if not argv_obj.prepare_only:
res['__meta__'] = benchmark.__dict__.copy()
res['__meta__']['cmdline'] = cmd
+ if stats is not None:
+ res['__meta__']['sensor_data'] = stats
+
sys.stdout.write(json.dumps(res))
if not argv_obj.prepare_only:
diff --git a/itest.py b/itest.py
index d4f4f52..178acce 100644
--- a/itest.py
+++ b/itest.py
@@ -2,12 +2,17 @@
import json
import types
import os.path
+import logging
+
from io_scenario import io
from ssh_copy_directory import copy_paths
from utils import run_over_ssh
+logger = logging.getLogger("io-perf-tool")
+
+
class IPerfTest(object):
def __init__(self, on_result_cb):
self.set_result_cb(on_result_cb)
@@ -58,10 +63,11 @@
def run(self, conn):
args = ['env', 'python2', self.io_py_remote] + self.script_opts
- code, out, err = run_over_ssh(conn, " ".join(args))
- self.on_result(code, out, err)
+ cmd = " ".join(args)
+ code, out, err = run_over_ssh(conn, cmd)
+ self.on_result(code, out, err, cmd)
- def on_result(self, code, out, err):
+ def on_result(self, code, out, err, cmd):
if 0 == code:
try:
for line in out.split("\n"):
@@ -70,3 +76,6 @@
except Exception as err:
msg = "Error during postprocessing results: {0!r}".format(err)
raise RuntimeError(msg)
+ else:
+ templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
+ logger.error(templ.format(cmd, code, err))
diff --git a/log.py b/log.py
deleted file mode 100644
index fdef091..0000000
--- a/log.py
+++ /dev/null
@@ -1,21 +0,0 @@
-import os
-import sys
-import datetime
-
-
-def _log(x):
- dt_str = datetime.datetime.now().strftime("%H:%M:%S")
- pref = dt_str + " " + str(os.getpid()) + " >>>> "
- sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
-
-
-logger = _log
-
-
-def log(x):
- logger(x)
-
-
-def setlogger(nlogger):
- global logger
- logger = nlogger
diff --git a/report.py b/report.py
index 51049ca..b707b52 100644
--- a/report.py
+++ b/report.py
@@ -102,8 +102,11 @@
scale_x = []
for build_id, build_results in value.items():
legend.append(build_id)
- ordered_build_results = OrderedDict(sorted(build_results.items(),
- key=lambda t: ssize_to_kb(t[0])))
+
+ OD = OrderedDict
+ ordered_build_results = OD(sorted(build_results.items(),
+ key=lambda t: ssize_to_kb(t[0])))
+
if not scale_x:
scale_x = ordered_build_results.keys()
dataset.append(zip(*ordered_build_results.values())[0])
@@ -141,4 +144,4 @@
if __name__ == '__main__':
- exit(main(sys.argv[1:]))
\ No newline at end of file
+ exit(main(sys.argv[1:]))
diff --git a/run_test.py b/run_test.py
index d3f5707..2d0c224 100644
--- a/run_test.py
+++ b/run_test.py
@@ -3,6 +3,7 @@
import json
import time
import pprint
+import logging
import os.path
import argparse
import traceback
@@ -17,6 +18,18 @@
from starts_vms import nova_connect, create_vms_mt, clear_all
+logger = logging.getLogger("io-perf-tool")
+logger.setLevel(logging.DEBUG)
+ch = logging.StreamHandler()
+ch.setLevel(logging.DEBUG)
+logger.addHandler(ch)
+
+log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+formatter = logging.Formatter(log_format,
+ "%H:%M:%S")
+ch.setFormatter(formatter)
+
+
def run_io_test(tool,
script_args,
test_runner,
@@ -73,7 +86,7 @@
return parser.parse_args(argv)
-def print_measurements_stat(res):
+def format_measurements_stat(res):
if len(res) != 0:
bw_mean = 0.0
for measurement in res:
@@ -89,10 +102,9 @@
's' if meta['sync'] else 'a',
meta['blocksize'])
- print
- print "====> " + json.dumps({key: (int(bw_mean), int(bw_dev))})
- print
- print "=" * 80
+ data = json.dumps({key: (int(bw_mean), int(bw_dev))})
+
+ return "\n====> {0}\n\n{1}\n".format(data, "=" * 80)
def get_io_opts(io_opts_file, io_opts):
@@ -113,9 +125,10 @@
opt_lines = [i for i in opt_lines if i != "" and not i.startswith("#")]
for opt_line in opt_lines:
- io_opts.append([opt.strip()
- for opt in opt_line.split(" ")
- if opt.strip() != ""])
+ if opt_line.strip() != "":
+ io_opts.append([opt.strip()
+ for opt in opt_line.strip().split(" ")
+ if opt.strip() != ""])
else:
io_opts = [[opt.strip()
for opt in io_opts.split(" ")
@@ -129,6 +142,13 @@
return io_opts
+def format_result(res):
+ data = "\n{0}\n".format("=" * 80)
+ data += pprint.pformat(res) + "\n"
+ data += "{0}\n".format("=" * 80)
+ return data + "\n" + format_measurements_stat(res) + "\n"
+
+
def main(argv):
opts = parse_args(argv)
@@ -141,7 +161,12 @@
io_opts = get_io_opts(opts.io_opts_file, opts.io_opts)
if opts.runner == "rally":
+ logger.debug("Use rally runner")
for script_args in io_opts:
+
+ cmd_line = " ".join(script_args)
+ logger.debug("Run test with {0!r} params".format(cmd_line))
+
runner = rally_runner.get_rally_runner(
files_dir=os.path.dirname(io_scenario.__file__),
rally_extra_opts=opts.runner_opts.split(" "),
@@ -152,14 +177,10 @@
script_args,
runner,
opts.keep_temp_files)
-
- print "=" * 80
- print pprint.pformat(res)
- print "=" * 80
-
- print_measurements_stat(res)
+ logger.debug(format_result(res))
elif opts.runner == "ssh":
+ logger.debug("Use ssh runner")
create_vms_opts = {}
for opt in opts.runner_opts.split(","):
name, val = opt.split("=", 1)
@@ -170,11 +191,13 @@
aff_group = create_vms_opts.pop("aff_group", None)
raw_count = create_vms_opts.pop("count", "x1")
+ logger.debug("Connection to nova")
nova = nova_connect()
if raw_count.startswith("x"):
+ logger.debug("Getting amount of compute services")
count = len(nova.services.list(binary="nova-compute"))
- count *= int(raw_count)
+ count *= int(raw_count[1:])
else:
count = int(raw_count)
@@ -185,19 +208,22 @@
create_vms_opts['scheduler_hints'] = scheduler_hints
- latest_start_time = opts.max_preparation_time + time.time()
-
# nova, amount, keypair_name, img_name,
# flavor_name, vol_sz=None, network_zone_name=None,
# flt_ip_pool=None, name_templ='ceph-test-{}',
# scheduler_hints=None
+ logger.debug("Will start {} vms".format(count))
+
try:
ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
for script_args in io_opts:
+ cmd_line = " ".join(script_args)
+ logger.debug("Run test with {0!r} params".format(cmd_line))
+ latest_start_time = opts.max_preparation_time + time.time()
runner = ssh_runner.get_ssh_runner(uris,
latest_start_time,
opts.keep_temp_files)
@@ -205,28 +231,16 @@
script_args,
runner,
opts.keep_temp_files)
- print "=" * 80
- print pprint.pformat(res)
- print "=" * 80
+ logger.debug(format_result(res))
- print_measurements_stat(res)
except:
traceback.print_exc()
finally:
+ logger.debug("Clearing")
clear_all(nova)
return 0
-ostack_prepare = """
-glance image-create --name 'ubuntu' --disk-format qcow2
---container-format bare --is-public true --copy-from
-https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
-
-nova flavor-create ceph.512 ceph.512 512 50 1
-nova server-group-create --policy anti-affinity ceph
-"""
-
-
if __name__ == '__main__':
exit(main(sys.argv[1:]))
diff --git a/scripts/data.py b/scripts/data.py
index c53871d..0635789 100644
--- a/scripts/data.py
+++ b/scripts/data.py
@@ -11,8 +11,16 @@
for block in re.split(splitter_rr, fc):
block = block.strip()
+
if not block.startswith("[{u'__meta__':"):
continue
+
+ print
+ print
+ print block
+ print
+ print
+
for val in eval(block):
meta = val['__meta__']
meta['sync'] = 's' if meta['sync'] else 'a'
diff --git a/scripts/run.sh b/scripts/run.sh
index a5c8aaf..11c90ee 100644
--- a/scripts/run.sh
+++ b/scripts/run.sh
@@ -3,12 +3,11 @@
type="iozone"
-bsizes="1k" # 4k 64k 256k 1m
-ops="write" # randwrite"
+bsizes="1k 4k 64k 256k 1m"
+ops="randwrite"
osync="s" # a
-num_times="1"
-concurrences="1 2 4 8 16 32 64 128"
-aff_group="0077d59c-bf5b-4326-8940-027e77d655ee"
+num_times="3"
+concurrences="32"
for concurrence in $concurrences; do
for bsize in $bsizes ; do
@@ -31,12 +30,14 @@
factor="r2"
fi
- extra_opts="user=ubuntu,keypair_name=ceph,img_name=ubuntu,flavor_name=ceph.512"
- extra_opts="${extra_opts},network_zone_name=net04,flt_ip_pool=net04_ext,key_file=ceph.pem"
- extra_opts="${extra_opts},aff_group=${aff_group}"
io_opts="--type $type -a $op --iodepth 16 --blocksize $bsize --iosize $factor $ssync --concurrency $concurrence"
+ # aff_group=$(nova server-group-list | grep ' ceph ' | awk '{print $2}')
+ # extra_opts="user=ubuntu,keypair_name=ceph,img_name=ubuntu,flavor_name=ceph.512"
+ # extra_opts="${extra_opts},network_zone_name=net04,flt_ip_pool=net04_ext,key_file=ceph.pem"
+ # extra_opts="${extra_opts},aff_group=${aff_group},count=x1"
+
echo $io_opts
# python run_test.py --runner ssh -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="$extra_opts"
diff --git a/scripts/run_2.sh b/scripts/run_2.sh
index d343d69..cdf29dc 100644
--- a/scripts/run_2.sh
+++ b/scripts/run_2.sh
@@ -1,43 +1,52 @@
#!/bin/bash
set -x
+function prepare() {
+ nova image-list | grep ' ubuntu ' >/dev/null
+ if [ $? -ne 0 ] ; then
+ url="https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img"
+ glance image-create --name 'ubuntu' --disk-format qcow2 --container-format bare --is-public true --copy-from $url
+ fi
+
+ nova flavor-list | grep ' ceph.512 ' >/dev/null
+ if [ $? -ne 0 ] ; then
+ nova flavor-create ceph.512 ceph.512 512 50 1
+ fi
+
+ nova server-group-list | grep ' ceph ' >/dev/null
+ if [ $? -ne 0 ] ; then
+ nova server-group-create --policy anti-affinity ceph
+ fi
+
+ nova keypair-list | grep ' ceph ' >/dev/null
+ if [ $? -ne 0 ] ; then
+ nova keypair-add ceph > ceph.pem
+ fi
+
+ nova secgroup-add-rule default icmp -1 -1 0.0.0.0/0
+ nova secgroup-add-rule default tcp 22 22 0.0.0.0/0
+}
+
+
+function run_test() {
+ set -e
+
+ iodepts="1"
+ for iodepth in $iodepts; do
+ extra_opts="user=ubuntu,keypair_name=ceph,img_name=ubuntu,flavor_name=ceph.512"
+ extra_opts="${extra_opts},network_zone_name=net04,flt_ip_pool=net04_ext,key_file=ceph.pem"
+ extra_opts="${extra_opts},aff_group=${aff_group},count=x1"
+
+ io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
+ python run_test.py --runner ssh -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="$extra_opts"
+ done
+
+ # io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
+ # python run_test.py --runner rally -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="--deployment perf-1"
+}
+
type="iozone"
-
-# nova image-list | grep ' ubuntu ' >/dev/null
-# if [ $? -ne 0 ] ; then
-# url="https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img"
-# glance image-create --name 'ubuntu' --disk-format qcow2 --container-format bare --is-public true --copy-from $url
-# fi
-
-# nova flavor-list | grep ' ceph.512 ' >/dev/null
-# if [ $? -ne 0 ] ; then
-# nova flavor-create ceph.512 ceph.512 512 50 1
-# fi
-
-# nova server-group-list | grep ' ceph ' >/dev/null
-# if [ $? -ne 0 ] ; then
-# nova server-group-create --policy anti-affinity ceph
-# fi
-
-# nova keypair-list | grep ' ceph ' >/dev/null
-# if [ $? -ne 0 ] ; then
-# nova keypair-add ceph > ceph.pem
-# fi
-
+prepare
+# run_test
# nova server-group-list | grep ' ceph ' | awk '{print $2}'
-aff_group="0077d59c-bf5b-4326-8940-027e77d655ee"
-
-set -e
-
-iodepts="1"
-for iodepth in $iodepts; do
- extra_opts="user=ubuntu,keypair_name=ceph,img_name=ubuntu,flavor_name=ceph.512"
- extra_opts="${extra_opts},network_zone_name=net04,flt_ip_pool=net04_ext,key_file=ceph.pem"
- extra_opts="${extra_opts},aff_group=${aff_group}"
-
- io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
- python run_test.py --runner ssh -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="$extra_opts"
-done
-
-# io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
-# python run_test.py --runner rally -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="--deployment perf-1"
+# aff_group="0077d59c-bf5b-4326-8940-027e77d655ee"
diff --git a/ssh_runner.py b/ssh_runner.py
index c7ba9a9..d5d7aca 100644
--- a/ssh_runner.py
+++ b/ssh_runner.py
@@ -1,5 +1,6 @@
import re
import Queue
+import logging
import traceback
import threading
from concurrent.futures import ThreadPoolExecutor
@@ -9,6 +10,9 @@
import itest
from utils import get_barrier, log_error, wait_on_barrier
+
+logger = logging.getLogger("io-perf-tool")
+
conn_uri_attrs = ("user", "passwd", "host", "port", "path")
@@ -90,8 +94,10 @@
latest_start_time=None,
keep_temp_files=False):
+ logger.debug("Connecting to servers")
+
with ThreadPoolExecutor(max_workers=16) as executor:
- connections = executor.map(connect, uris)
+ connections = list(executor.map(connect, uris))
result_queue = Queue.Queue()
barrier = get_barrier(len(uris), threaded=True)
@@ -102,6 +108,7 @@
params = (obj, barrier, latest_start_time)
+ logger.debug("Start tests")
for conn in connections:
th = threading.Thread(None, conn_func, None,
params + (conn,))
@@ -116,6 +123,7 @@
while not result_queue.empty():
test_result.append(result_queue.get())
+ logger.debug("Done. Closing connection")
for conn in connections:
conn.close()
diff --git a/starts_vms.py b/starts_vms.py
index dbade23..368aadf 100644
--- a/starts_vms.py
+++ b/starts_vms.py
@@ -1,6 +1,7 @@
import re
import os
import time
+import logging
from concurrent.futures import ThreadPoolExecutor
@@ -8,6 +9,9 @@
from cinderclient.v1.client import Client as c_client
+logger = logging.getLogger("io-perf-tool")
+
+
def ostack_get_creds():
env = os.environ.get
name = env('OS_USERNAME')
@@ -33,7 +37,7 @@
while vol.status != 'available':
if vol.status == 'error':
if err_count == 3:
- print "Fail to create volume"
+ logger.critical("Fail to create volume")
raise RuntimeError("Fail to create volume")
else:
err_count += 1
@@ -99,30 +103,36 @@
ips_future = None
if ips_future is not None:
+ logger.debug("Wait for floating ip")
ips = ips_future.result()
ips += [Allocate] * (amount - len(ips))
else:
ips = [None] * amount
+ logger.debug("Getting for flavor object")
fl = fl_future.result()
+ logger.debug("Getting for image object")
img = img_future.result()
if network_future is not None:
+ logger.debug("Waiting for network results")
nics = [{'net-id': network_future.result().id}]
else:
nics = None
- print "Try to start {0} servers".format(amount)
names = map(name_templ.format, range(amount))
futures = []
+ logger.debug("Requesting new vm")
for name, flt_ip in zip(names, ips):
params = (nova, name, keypair_name, img, fl,
nics, vol_sz, flt_ip, scheduler_hints,
flt_ip_pool)
futures.append(executor.submit(create_vm, *params))
- return [future.result() for future in futures]
+ res = [future.result() for future in futures]
+ logger.debug("Done spawning")
+ return res
def create_vm(nova, name, keypair_name, img,
@@ -138,7 +148,7 @@
if not wait_for_server_active(nova, srv):
msg = "Server {0} fails to start. Kill it and try again"
- print msg.format(srv.name)
+ logger.debug(msg.format(srv))
nova.servers.delete(srv)
while True:
@@ -171,15 +181,15 @@
deleted_srvs = set()
for srv in nova.servers.list():
if re.match(name_templ.format("\\d+"), srv.name):
- print "Deleting server", srv.name
+ logger.debug("Deleting server {0}".format(srv.name))
nova.servers.delete(srv)
deleted_srvs.add(srv.id)
while deleted_srvs != set():
- print "Waiting till all servers are actually deleted"
+ logger.debug("Waiting till all servers are actually deleted")
all_id = set(srv.id for srv in nova.servers.list())
if all_id.intersection(deleted_srvs) == set():
- print "Done, deleting volumes"
+ logger.debug("Done, deleting volumes")
break
time.sleep(1)
@@ -193,7 +203,7 @@
print "Deleting volume", vol.display_name
cinder.volumes.delete(vol)
- print "Clearing done (yet some volumes may still deleting)"
+ logger.debug("Clearing done (yet some volumes may still deleting)")
# def prepare_host(key_file, ip, fio_path, dst_fio_path, user='cirros'):
@@ -218,33 +228,33 @@
# exec_on_host("sudo /bin/chmod a+rwx /media/ceph")
-def main():
- image_name = 'TestVM'
- flavor_name = 'ceph'
- vol_sz = 50
- network_zone_name = 'net04'
- amount = 10
- keypair_name = 'ceph-test'
+# def main():
+# image_name = 'TestVM'
+# flavor_name = 'ceph'
+# vol_sz = 50
+# network_zone_name = 'net04'
+# amount = 10
+# keypair_name = 'ceph-test'
- nova = nova_connect()
- clear_all(nova)
+# nova = nova_connect()
+# clear_all(nova)
- try:
- ips = []
- params = dict(vol_sz=vol_sz)
- params['image_name'] = image_name
- params['flavor_name'] = flavor_name
- params['network_zone_name'] = network_zone_name
- params['amount'] = amount
- params['keypair_name'] = keypair_name
+# try:
+# ips = []
+# params = dict(vol_sz=vol_sz)
+# params['image_name'] = image_name
+# params['flavor_name'] = flavor_name
+# params['network_zone_name'] = network_zone_name
+# params['amount'] = amount
+# params['keypair_name'] = keypair_name
- for ip, host in create_vms(nova, **params):
- ips.append(ip)
+# for ip, host in create_vms(nova, **params):
+# ips.append(ip)
- print "All setup done! Ips =", " ".join(ips)
- print "Starting tests"
- finally:
- clear_all(nova)
+# print "All setup done! Ips =", " ".join(ips)
+# print "Starting tests"
+# finally:
+# clear_all(nova)
-if __name__ == "__main__":
- exit(main())
+# if __name__ == "__main__":
+# exit(main())
diff --git a/utils.py b/utils.py
index 5cc1ac2..a6fbe09 100644
--- a/utils.py
+++ b/utils.py
@@ -1,11 +1,13 @@
import time
+import logging
import threading
import contextlib
import multiprocessing
import paramiko
-from log import log
+
+logger = logging.getLogger("io-perf-tool")
def get_barrier(count, threaded=False):
@@ -56,16 +58,16 @@
if timeout is not None and timeout > 0:
msg = "Ready and waiting on barrier. " + \
"Will wait at most {0} seconds"
- log(msg.format(int(timeout)))
+ logger.debug(msg.format(int(timeout)))
if not barrier(timeout):
- log("Barrier timeouted")
+ logger.debug("Barrier timeouted")
@contextlib.contextmanager
def log_error(action, types=(Exception,)):
if not action.startswith("!"):
- log("Starts : " + action)
+ logger.debug("Starts : " + action)
else:
action = action[1:]
@@ -74,7 +76,7 @@
except Exception as exc:
if isinstance(exc, types) and not isinstance(exc, StopIteration):
templ = "Error during {0} stage: {1}"
- log(templ.format(action, exc.message))
+ logger.debug(templ.format(action, exc.message))
raise