fixing bugs
diff --git a/TODO b/TODO
index c7d6826..7c51227 100644
--- a/TODO
+++ b/TODO
@@ -1,5 +1,19 @@
-Репорты
-Юнит-тесты
+Интеграционные/функциональные тесты *
+Mysql tests *
+Репорты v2.0 *
+Centos support *
+Инфо по лабе * - выковырять из certification (Глеб)
+Сравнения билдов (пока по папкам из CLI)
+Finding bottlenecks (алена)
+Make python module
+putget/ssbench tests (костя)
+тестирование (костя)
+
+
+Intellectual granular sensors
+
+
+
Автоинтеграция с опенстек
Отчеты
Добавить к отчету экстраполированные скорости
diff --git a/config.py b/config.py
index e5962da..8f656ec 100644
--- a/config.py
+++ b/config.py
@@ -40,3 +40,6 @@
cfg_dict['text_report_file'] = in_var_dir('report.txt')
cfg_dict['log_file'] = in_var_dir('log.txt')
cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+ cfg_dict['test_log_directory'] = in_var_dir('test_logs')
+ if not os.path.exists(cfg_dict['test_log_directory']):
+ os.makedirs(cfg_dict['test_log_directory'])
diff --git a/nodes/node.py b/nodes/node.py
index 138d123..fad4f29 100644
--- a/nodes/node.py
+++ b/nodes/node.py
@@ -1,3 +1,6 @@
+import urlparse
+
+
class Node(object):
def __init__(self, conn_url, roles):
@@ -5,6 +8,9 @@
self.conn_url = conn_url
self.connection = None
+ def get_ip(self):
+ return urlparse.urlparse(self.conn_url).hostname
+
def __str__(self):
templ = "<Node: url={conn_url!r} roles={roles}" + \
" connected={is_connected}>"
diff --git a/report.py b/report.py
index aa1c356..4eb3cb8 100644
--- a/report.py
+++ b/report.py
@@ -200,7 +200,7 @@
lab_info = ""
for suite_type, test_suite_data in results:
- if suite_type != 'io':
+ if suite_type != 'io' or test_suite_data is None:
continue
io_test_suite_res = test_suite_data['res']
diff --git a/run_test.py b/run_test.py
index 6465760..a3a1941 100755
--- a/run_test.py
+++ b/run_test.py
@@ -53,6 +53,8 @@
self.use_color = use_color
def format(self, record):
+ orig = record.__dict__
+ record.__dict__ = record.__dict__.copy()
levelname = record.levelname
prn_name = ' ' * (6 - len(levelname)) + levelname
@@ -61,7 +63,9 @@
else:
record.levelname = prn_name
- return logging.Formatter.format(self, record)
+ res = super(ColoredFormatter, self).format(record)
+ record.__dict__ = orig
+ return res
def setup_logger(logger, level=logging.DEBUG, log_fname=None):
@@ -73,18 +77,21 @@
colored_formatter = ColoredFormatter(log_format,
"%H:%M:%S")
- formatter = logging.Formatter(log_format,
- "%H:%M:%S")
sh.setFormatter(colored_formatter)
logger.addHandler(sh)
+ logger_api = logging.getLogger("io-perf-tool.fuel_api")
+
if log_fname is not None:
fh = logging.FileHandler(log_fname)
+ log_format = '%(asctime)s - %(levelname)6s - %(name)s - %(message)s'
+ formatter = logging.Formatter(log_format,
+ "%H:%M:%S")
fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
+ logger_api.addHandler(fh)
- logger_api = logging.getLogger("io-perf-tool.fuel_api")
logger_api.addHandler(sh)
logger_api.setLevel(logging.WARNING)
@@ -103,6 +110,8 @@
self.nodes = []
self.clear_calls_stack = []
self.openstack_nodes_ids = []
+ self.sensor_cm = None
+ self.keep_vm = False
def connect_one(node):
@@ -147,6 +156,12 @@
logger.exception("In test {0} for node {1}".format(test, node))
res_q.put(exc)
+ try:
+ test.cleanup(node.connection)
+ except:
+ msg = "Duringf cleanup - in test {0} for node {1}"
+ logger.exception(msg.format(test, node))
+
def run_tests(test_block, nodes):
tool_type_mapper = {
@@ -156,18 +171,30 @@
test_nodes = [node for node in nodes
if 'testnode' in node.roles]
-
+ test_number_per_type = {}
res_q = Queue.Queue()
for name, params in test_block.items():
logger.info("Starting {0} tests".format(name))
-
+ test_num = test_number_per_type.get(name, 0)
+ test_number_per_type[name] = test_num + 1
threads = []
barrier = utils.Barrier(len(test_nodes))
+
for node in test_nodes:
msg = "Starting {0} test on {1} node"
logger.debug(msg.format(name, node.conn_url))
- test = tool_type_mapper[name](params, res_q.put)
+
+ dr = os.path.join(
+ cfg_dict['test_log_directory'],
+ "{0}_{1}_{2}".format(name, test_num, node.get_ip())
+ )
+
+ if not os.path.exists(dr):
+ os.makedirs(dr)
+
+ test = tool_type_mapper[name](params, res_q.put, dr,
+ node=node.get_ip())
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
@@ -231,6 +258,7 @@
ctx.clear_calls_stack.append(remove_sensors_stage)
cfg = cfg_dict.get('sensors')
sens_cfg = []
+ monitored_nodes = []
for role, sensors_str in cfg["roles_mapping"].items():
sensors = [sens.strip() for sens in sensors_str.split(",")]
@@ -239,9 +267,24 @@
for node in ctx.nodes:
if role in node.roles:
+ monitored_nodes.append(node)
sens_cfg.append((node.connection, collect_cfg))
- ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+ ctx.receiver_uri = cfg["receiver_uri"]
+ if '{ip}' in ctx.receiver_uri:
+ ips = set(utils.get_ip_for_target(node.get_ip())
+ for node in monitored_nodes)
+
+ if len(ips) > 1:
+ raise ValueError("Can't select external ip for sensors server")
+
+ if len(ips) == 0:
+ raise ValueError("Can't find any external ip for sensors server")
+
+ ext_ip = list(ips)[0]
+ ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
+
+ ctx.sensor_cm = start_monitoring(ctx.receiver_uri, None,
connected_config=sens_cfg)
ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
@@ -255,10 +298,11 @@
def remove_sensors_stage(cfg, ctx):
- ctx.sensor_cm.__exit__(None, None, None)
- ctx.sensors_control_queue.put(None)
- ctx.sensor_listen_thread.join()
- ctx.sensor_data = ctx.sensors_control_queue.get()
+ if ctx.sensor_cm is not None:
+ ctx.sensor_cm.__exit__(None, None, None)
+ ctx.sensors_control_queue.put(None)
+ ctx.sensor_listen_thread.join()
+ ctx.sensor_data = ctx.sensors_control_queue.get()
def get_os_credentials(cfg, ctx, creds_type):
@@ -280,7 +324,8 @@
elif creds_type == 'ENV':
user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
elif os.path.isfile(creds_type):
- user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+ raise NotImplementedError()
+ # user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
else:
msg = "Creds {0!r} isn't supported".format(creds_type)
raise ValueError(msg)
@@ -314,8 +359,8 @@
start_vms.nova_connect(**os_creds)
- # logger.info("Preparing openstack")
- # start_vms.prepare_os(**os_creds)
+ logger.info("Preparing openstack")
+ start_vms.prepare_os_subpr(**os_creds)
new_nodes = []
try:
@@ -342,6 +387,7 @@
sens_cfg.append((node.connection, collect_cfg))
uri = cfg["sensors"]["receiver_uri"]
+ logger.debug("Installing sensors on vm's")
deploy_and_start_sensors(uri, None,
connected_config=sens_cfg)
@@ -349,10 +395,11 @@
ctx.results.extend(run_tests(test_group, ctx.nodes))
finally:
- shut_down_vms_stage(cfg, ctx)
+ if not ctx.keep_vm:
+ shut_down_vms_stage(cfg, ctx)
- elif 'tests' in key:
- ctx.results.extend(run_tests(config, ctx.nodes))
+ else:
+ ctx.results.extend(run_tests(group, ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -423,12 +470,11 @@
def console_report_stage(cfg, ctx):
for tp, data in ctx.results:
- if 'io' == tp:
+ if 'io' == tp and data is not None:
print format_results_for_console(data)
def report_stage(cfg, ctx):
-
html_rep_fname = cfg['html_report_file']
fuel_url = cfg['clouds']['fuel']['url']
creds = cfg['clouds']['fuel']['creds']
@@ -439,7 +485,7 @@
text_rep_fname = cfg_dict['text_report_file']
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results:
- if 'io' == tp:
+ if 'io' == tp and data is not None:
fd.write(format_results_for_console(data))
fd.write("\n")
fd.flush()
@@ -475,6 +521,7 @@
parser.add_argument("-u", '--username', type=str, default="admin")
parser.add_argument("-p", '--post-process-only', default=None)
parser.add_argument("-o", '--output-dest', nargs="*")
+ parser.add_argument("-k", '--keep-vm', action='store_true', default=False)
parser.add_argument("config_file", nargs="?", default="config.yaml")
return parser.parse_args(argv[1:])
@@ -514,6 +561,7 @@
ctx.build_meta['build_descrption'] = opts.build_description
ctx.build_meta['build_type'] = opts.build_type
ctx.build_meta['username'] = opts.username
+ ctx.keep_vm = opts.keep_vm
try:
for stage in stages:
diff --git a/scripts/config.sh b/scripts/config.sh
index 2283eee..2937edc 100644
--- a/scripts/config.sh
+++ b/scripts/config.sh
@@ -1,4 +1,4 @@
-FLAVOR_NAME="disk_io_perf.256"
+FLAVOR_NAME="disk_io_perf.1024"
SERV_GROUP="disk_io_perf.aa"
KEYPAIR_NAME="disk_io_perf"
IMAGE_NAME="disk_io_perf"
@@ -10,3 +10,4 @@
VM_COUNT="x1"
TESTER_TYPE="iozone"
RUNNER="ssh"
+SECGROUP='disk_io_perf'
diff --git a/scripts/prepare.sh b/scripts/prepare.sh
index 7151b2f..587f577 100644
--- a/scripts/prepare.sh
+++ b/scripts/prepare.sh
@@ -5,8 +5,8 @@
source "$my_dir/config.sh"
# settings
-FL_RAM=256
-FL_HDD=20
+FL_RAM=1024
+FL_HDD=50
FL_CPU=1
@@ -45,6 +45,14 @@
echo " Not Found"
fi
+ echo -n "Looking for security group $SECGROUP ... "
+ export secgroup_id=$(nova secgroup-list | grep " $SECGROUP " | awk '{print $2}' )
+ if [ ! -z "$secgroup_id" ] ; then
+ echo " Found"
+ else
+ echo " Not Found"
+ fi
+
set -e
}
@@ -75,6 +83,10 @@
echo "deleting keypair file $KEY_FILE_NAME"
rm -f "$KEY_FILE_NAME"
fi
+
+ if [ ! -z "$secgroup_id" ] ; then
+ nova secgroup-delete $SECGROUP >/dev/null
+ fi
}
function prepare() {
@@ -103,11 +115,12 @@
chmod og= "$KEY_FILE_NAME"
fi
- echo "Adding rules for ping and ssh"
- set +e
- nova secgroup-add-rule default icmp -1 -1 0.0.0.0/0 >/dev/null
- nova secgroup-add-rule default tcp 22 22 0.0.0.0/0 >/dev/null
- set -e
+ if [ -z "$secgroup_id" ] ; then
+ echo "Adding rules for ping and ssh"
+ nova secgroup-create $SECGROUP $SECGROUP >/dev/null
+ nova secgroup-add-rule $SECGROUP icmp -1 -1 0.0.0.0/0 >/dev/null
+ nova secgroup-add-rule $SECGROUP tcp 22 22 0.0.0.0/0 >/dev/null
+ fi
}
if [ "$1" = "--clear" ] ; then
diff --git a/sensors/storage/__init__.py b/sensors/storage/__init__.py
index e69de29..d7bf6aa 100644
--- a/sensors/storage/__init__.py
+++ b/sensors/storage/__init__.py
@@ -0,0 +1,104 @@
+import struct
+
+
+def pack(val, tp=True):
+ if isinstance(val, int):
+ assert 0 <= val < 2 ** 16
+
+ if tp:
+ res = 'i'
+ else:
+ res = ""
+
+ res += struct.pack("!U", val)
+ elif isinstance(val, dict):
+ assert len(val) < 2 ** 16
+ if tp:
+ res = "d"
+ else:
+ res = ""
+
+ res += struct.pack("!U", len(val))
+ for k, v in dict.items():
+ assert 0 <= k < 2 ** 16
+ assert 0 <= v < 2 ** 32
+ res += struct.pack("!UI", k, v)
+ elif isinstance(val, str):
+ assert len(val) < 256
+ if tp:
+ res = "s"
+ else:
+ res = ""
+ res += chr(len(val)) + val
+ else:
+ raise ValueError()
+
+ return res
+
+
+def unpack(fd, tp=None):
+ if tp is None:
+ tp = fd.read(1)
+
+ if tp == 'i':
+ return struct.unpack("!U", fd.read(2))
+ elif tp == 'd':
+ res = {}
+ val_len = struct.unpack("!U", fd.read(2))
+ for _ in range(val_len):
+ k, v = struct.unpack("!UI", fd.read(6))
+ res[k] = v
+ return res
+ elif tp == 's':
+ val_len = struct.unpack("!U", fd.read(2))
+ return fd.read(val_len)
+
+ raise ValueError()
+
+
+class LocalStorage(object):
+ NEW_DATA = 0
+ NEW_SENSOR = 1
+ NEW_SOURCE = 2
+
+ def __init__(self, fd):
+ self.fd = fd
+ self.sensor_ids = {}
+ self.sources_ids = {}
+ self.max_source_id = 0
+ self.max_sensor_id = 0
+
+ def add_data(self, source, sensor_values):
+ source_id = self.sources_ids.get(source)
+ if source_id is None:
+ source_id = self.max_source_id
+ self.sources_ids[source] = source_id
+ self.emit(self.NEW_SOURCE, source_id, source)
+ self.max_source_id += 1
+
+ new_sensor_values = {}
+
+ for name, val in sensor_values.items():
+ sensor_id = self.sensor_ids.get(name)
+ if sensor_id is None:
+ sensor_id = self.max_sensor_id
+ self.sensor_ids[name] = sensor_id
+ self.emit(self.NEW_SENSOR, sensor_id, name)
+ self.max_sensor_id += 1
+ new_sensor_values[sensor_id] = val
+
+ self.emit(self.NEW_DATA, source_id, new_sensor_values)
+
+ def emit(self, tp, v1, v2):
+ self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
+
+ def readall(self):
+ tp = self.fd.read(1)
+ if ord(tp) == self.NEW_DATA:
+ pass
+ elif ord(tp) == self.NEW_SENSOR:
+ pass
+ elif ord(tp) == self.NEW_SOURCE:
+ pass
+ else:
+ raise ValueError()
diff --git a/ssh_utils.py b/ssh_utils.py
index c4a18e8..aea3111 100644
--- a/ssh_utils.py
+++ b/ssh_utils.py
@@ -133,6 +133,12 @@
ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+def delete_file(conn, path):
+ sftp = conn.open_sftp()
+ sftp.remove(path)
+ sftp.close()
+
+
def copy_paths(conn, paths):
sftp = conn.open_sftp()
try:
@@ -231,11 +237,14 @@
all_sessions = []
-def run_over_ssh(conn, cmd, stdin_data=None, timeout=60, nolog=False):
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60, nolog=False, node=None):
"should be replaces by normal implementation, with select"
transport = conn.get_transport()
session = transport.open_session()
+ if node is None:
+ node = ""
+
with all_sessions_lock:
all_sessions.append(session)
@@ -245,7 +254,7 @@
stime = time.time()
if not nolog:
- logger.debug("SSH: Exec {1!r}".format(conn, cmd))
+ logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
session.exec_command(cmd)
@@ -273,8 +282,8 @@
session.close()
if code != 0:
- templ = "Cmd {0!r} failed with code {1}. Output: {2}"
- raise OSError(templ.format(cmd, code, output))
+ templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+ raise OSError(templ.format(node, cmd, code, output))
return output
diff --git a/start_vms.py b/start_vms.py
index 93162a3..62e4ca5 100644
--- a/start_vms.py
+++ b/start_vms.py
@@ -6,7 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
-# from novaclient.exceptions import NotFound
+from novaclient.exceptions import NotFound
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
@@ -45,7 +45,7 @@
NOVA_CONNECTION = None
-def prepare_os(name=None, passwd=None, tenant=None, auth_url=None):
+def prepare_os_subpr(name=None, passwd=None, tenant=None, auth_url=None):
if name is None:
name, passwd, tenant, auth_url = ostack_get_creds()
@@ -58,24 +58,46 @@
params_s = " ".join("{}={}".format(k, v) for k, v in params.items())
- cmd = "env {params} bash scripts/prepare.sh".format(params_s)
+ cmd_templ = "env {params} bash scripts/prepare.sh >/dev/null"
+ cmd = cmd_templ.format(params=params_s)
subprocess.call(cmd, shell=True)
- return NOVA_CONNECTION
+
+def prepare_os(nova, params):
+ allow_ssh(nova, params['security_group'])
+
+ shed_ids = []
+ for shed_group in params['schedulers_groups']:
+ shed_ids.append(get_or_create_aa_group(nova, shed_group))
+
+ create_keypair(nova,
+ params['keypair_name'],
+ params['pub_key_path'],
+ params['priv_key_path'])
+
+ create_image(nova, params['image']['name'],
+ params['image']['url'])
+
+ create_flavor(nova, **params['flavor'])
-# def get_or_create_aa_group(nova, name):
-# try:
-# group = conn.server_groups.find(name=name)
-# except NotFound:
-# group = None
+def get_or_create_aa_group(nova, name):
+ try:
+ group = nova.server_groups.find(name=name)
+ except NotFound:
+ group = nova.server_groups.create({'name': name,
+ 'policies': ['anti-affinity']})
-# if group is None:
-# conn.server_groups.create
+ return group.id
-def allow_ssh(nova):
- secgroup = nova.security_groups.find(name="default")
+def allow_ssh(nova, group_name):
+ try:
+ secgroup = nova.security_groups.find(name=group_name)
+ except NotFound:
+ secgroup = nova.security_groups.create(group_name,
+ "allow ssh/ping to node")
+
nova.security_group_rules.create(secgroup.id,
ip_protocol="tcp",
from_port="22",
@@ -87,11 +109,32 @@
from_port=-1,
cidr="0.0.0.0/0",
to_port=-1)
+ return secgroup.id
-def create_keypair(nova, name, key_path):
- with open(key_path) as key:
- return nova.keypairs.create(name, key.read())
+def create_image(nova, name, url):
+ pass
+
+
+def create_flavor(nova, name, **params):
+ pass
+
+
+def create_keypair(nova, name, pub_key_path, priv_key_path):
+ try:
+ nova.keypairs.find(name=name)
+ except NotFound:
+ if os.path.exists(pub_key_path):
+ with open(pub_key_path) as pub_key_fd:
+ return nova.keypairs.create(name, pub_key_fd.read())
+ else:
+ key = nova.keypairs.create(name)
+
+ with open(priv_key_path, "w") as priv_key_fd:
+ priv_key_fd.write(key.private_key)
+
+ with open(pub_key_path, "w") as pub_key_fd:
+ pub_key_fd.write(key.public_key)
def create_volume(size, name):
@@ -156,19 +199,30 @@
srv_count = len([srv for srv in lst if srv.status == 'enabled'])
count = srv_count * int(count[1:])
- srv_params = "img: {img_name}, flavor: {flavor_name}".format(**params)
+ srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
msg_templ = "Will start {0} servers with next params: {1}"
logger.info(msg_templ.format(count, srv_params))
vm_creds = params.pop('creds')
+ params = params.copy()
+
+ params['img_name'] = params['image']['name']
+ params['flavor_name'] = params['flavor']['name']
+
+ del params['image']
+ del params['flavor']
+ del params['scheduler_group_name']
+ private_key_path = params.pop('private_key_path')
+
for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **params):
- yield Node(vm_creds.format(ip), []), os_node.id
+ conn_uri = vm_creds.format(ip=ip, private_key_path=private_key_path)
+ yield Node(conn_uri, []), os_node.id
def create_vms_mt(nova, amount, keypair_name, img_name,
flavor_name, vol_sz=None, network_zone_name=None,
flt_ip_pool=None, name_templ='ceph-test-{0}',
- scheduler_hints=None):
+ scheduler_hints=None, security_group=None):
with ThreadPoolExecutor(max_workers=16) as executor:
if network_zone_name is not None:
@@ -208,7 +262,7 @@
for name, flt_ip in zip(names, ips):
params = (nova, name, keypair_name, img, fl,
nics, vol_sz, flt_ip, scheduler_hints,
- flt_ip_pool)
+ flt_ip_pool, [security_group])
futures.append(executor.submit(create_vm, *params))
res = [future.result() for future in futures]
@@ -220,12 +274,16 @@
fl, nics, vol_sz=None,
flt_ip=False,
scheduler_hints=None,
- pool=None):
+ pool=None,
+ security_groups=None):
for i in range(3):
srv = nova.servers.create(name,
- flavor=fl, image=img, nics=nics,
+ flavor=fl,
+ image=img,
+ nics=nics,
key_name=keypair_name,
- scheduler_hints=scheduler_hints)
+ scheduler_hints=scheduler_hints,
+ security_groups=security_groups)
if not wait_for_server_active(nova, srv):
msg = "Server {0} fails to start. Kill it and try again"
@@ -276,13 +334,16 @@
nova.servers.delete(srv)
deleted_srvs.add(srv.id)
- while deleted_srvs != set():
- logger.debug("Waiting till all servers are actually deleted")
+ count = 0
+ while True:
+ if count % 60 == 0:
+ logger.debug("Waiting till all servers are actually deleted")
all_id = set(srv.id for srv in nova.servers.list())
- if all_id.intersection(deleted_srvs) == set():
- logger.debug("Done, deleting volumes")
+ if len(all_id.intersection(deleted_srvs)) == 0:
break
+ count += 1
time.sleep(1)
+ logger.debug("Done, deleting volumes")
# wait till vm actually deleted
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
index e0af6e7..938ec3f 100644
--- a/tests/disk_test_agent.py
+++ b/tests/disk_test_agent.py
@@ -1,4 +1,3 @@
-import re
import sys
import time
import json
@@ -119,10 +118,12 @@
for i in range(repeat):
yield name.format(**params), processed_vals.copy()
if 'ramp_time' in processed_vals:
- del processed_vals['ramp_time']
+ processed_vals['_ramp_time'] = ramp_time
+ processed_vals.pop('ramp_time')
if ramp_time is not None:
processed_vals['ramp_time'] = ramp_time
+ processed_vals.pop('_ramp_time')
def calculate_execution_time(combinations):
@@ -208,6 +209,9 @@
def format_fio_config(fio_cfg):
res = ""
for pos, (name, section) in enumerate(fio_cfg):
+ if name.startswith('_'):
+ continue
+
if pos != 0:
res += "\n"
@@ -343,7 +347,7 @@
def do_run_fio(bconf):
benchmark_config = format_fio_config(bconf)
- cmd = ["fio", "--output-format=json", "-"]
+ cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
@@ -389,11 +393,17 @@
runtime += curr_task_time
jcount += jc
bconf.append((name, sec))
+ if '_ramp_time' in sec:
+ del sec['_ramp_time']
continue
assert len(bconf) != 0
yield bconf
+ if '_ramp_time' in sec:
+ sec['ramp_time'] = sec.pop('_ramp_time')
+ curr_task_time = calculate_execution_time([(name, sec)])
+
runtime = curr_task_time
jcount = jc
bconf = [(name, sec)]
@@ -442,6 +452,16 @@
res[jname] = j_res
+def compile(benchmark_config, params, runcycle=None):
+ whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ res = ""
+
+ for bconf in next_test_portion(whole_conf, runcycle):
+ res += format_fio_config(bconf)
+
+ return res
+
+
def run_fio(benchmark_config,
params,
runcycle=None,
@@ -478,6 +498,13 @@
add_job_results(jname, job_output, jconfig, res)
+ msg_template = "Done {0} tests from {1}. ETA: {2}"
+ exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
+
+ print msg_template.format(curr_test_num - skip_tests,
+ len(whole_conf),
+ sec_to_str(exec_time))
+
except (SystemExit, KeyboardInterrupt):
raise
@@ -512,8 +539,8 @@
job_cfg += char
-def estimate_cfg(job_cfg, params):
- bconf = list(parse_fio_config_full(job_cfg, params))
+def estimate_cfg(job_cfg, params, skip_tests=0):
+ bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
return calculate_execution_time(bconf)
diff --git a/tests/io_scenario_hdd.cfg b/tests/io_scenario_hdd.cfg
index 46210b7..5e24009 100644
--- a/tests/io_scenario_hdd.cfg
+++ b/tests/io_scenario_hdd.cfg
@@ -4,7 +4,7 @@
time_based
buffered=0
iodepth=1
-
+softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
diff --git a/tests/itest.py b/tests/itest.py
index 975499b..d1e7c00 100644
--- a/tests/itest.py
+++ b/tests/itest.py
@@ -7,7 +7,7 @@
from disk_perf_test_tool.tests.disk_test_agent import parse_fio_config_full
from disk_perf_test_tool.tests.disk_test_agent import estimate_cfg, sec_to_str
from disk_perf_test_tool.tests.io_results_loader import parse_output
-from disk_perf_test_tool.ssh_utils import copy_paths, run_over_ssh
+from disk_perf_test_tool.ssh_utils import copy_paths, run_over_ssh, delete_file
from disk_perf_test_tool.utils import ssize_to_b
@@ -15,20 +15,26 @@
class IPerfTest(object):
- def __init__(self, on_result_cb):
+ def __init__(self, on_result_cb, log_directory=None, node=None):
self.on_result_cb = on_result_cb
+ self.log_directory = log_directory
+ self.node = node
def pre_run(self, conn):
pass
+ def cleanup(self, conn):
+ pass
+
@abc.abstractmethod
def run(self, conn, barrier):
pass
class TwoScriptTest(IPerfTest):
- def __init__(self, opts, on_result_cb):
- super(TwoScriptTest, self).__init__(on_result_cb)
+ def __init__(self, opts, on_result_cb, log_directory=None, node=None):
+ super(TwoScriptTest, self).__init__(on_result_cb, log_directory,
+ node=node)
self.opts = opts
self.pre_run_script = None
self.run_script = None
@@ -53,14 +59,14 @@
def pre_run(self, conn):
remote_script = self.copy_script(conn, self.pre_run_script)
cmd = remote_script
- run_over_ssh(conn, cmd)
+ run_over_ssh(conn, cmd, node=self.node)
def run(self, conn, barrier):
remote_script = self.copy_script(conn, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.opts.items()])
cmd = remote_script + ' ' + cmd_opts
- out_err = run_over_ssh(conn, cmd)
+ out_err = run_over_ssh(conn, cmd, node=self.node)
self.on_result(out_err, cmd)
def parse_results(self, out):
@@ -86,32 +92,52 @@
self.run_script = "tests/postgres/run.sh"
+def open_for_append_or_create(fname):
+ if not os.path.exists(fname):
+ return open(fname, "w")
+
+ fd = open(fname, 'r+')
+ fd.seek(0, os.SEEK_END)
+ return fd
+
+
class IOPerfTest(IPerfTest):
io_py_remote = "/tmp/disk_test_agent.py"
- def __init__(self,
- test_options,
- on_result_cb):
- IPerfTest.__init__(self, on_result_cb)
+ def __init__(self, test_options, on_result_cb,
+ log_directory=None, node=None):
+ IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
self.options = test_options
self.config_fname = test_options['cfg']
+ self.alive_check_interval = test_options.get('alive_check_interval')
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
self.configs = list(parse_fio_config_full(self.raw_cfg,
self.config_params))
- def pre_run(self, conn):
+ cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+ raw_res = os.path.join(self.log_directory, "raw_results.txt")
+ fio_command_file = open_for_append_or_create(cmd_log)
+ fio_command_file.write(disk_test_agent.compile(self.raw_cfg,
+ self.config_params,
+ None))
+ self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+ def cleanup(self, conn):
+ delete_file(conn, self.io_py_remote)
+
+ def pre_run(self, conn):
try:
- run_over_ssh(conn, 'which fio')
+ run_over_ssh(conn, 'which fio', node=self.node)
except OSError:
# TODO: install fio, if not installed
cmd = "sudo apt-get -y install fio"
for i in range(3):
try:
- run_over_ssh(conn, cmd)
+ run_over_ssh(conn, cmd, node=self.node)
break
except OSError as err:
time.sleep(3)
@@ -136,7 +162,7 @@
for fname, sz in files.items():
cmd = cmd_templ.format(fname, 1024 ** 2, msz)
- run_over_ssh(conn, cmd, timeout=msz)
+ run_over_ssh(conn, cmd, timeout=msz, node=self.node)
def run(self, conn, barrier):
cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
@@ -155,11 +181,14 @@
try:
if barrier.wait():
- logger.info("Test will takes about {0}".format(exec_time_str))
+ templ = "Test should takes about {0}. Will wait at most {1}"
+ timeout = int(exec_time * 1.1 + 300)
+ logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
out_err = run_over_ssh(conn, cmd,
stdin_data=self.raw_cfg,
- timeout=int(exec_time * 1.1 + 300))
+ timeout=timeout,
+ node=self.node)
logger.info("Done")
finally:
barrier.exit()
@@ -175,6 +204,9 @@
raise RuntimeError(msg_templ.format(exc.message))
def merge_results(self, results):
+ if len(results) == 0:
+ return None
+
merged_result = results[0]
merged_data = merged_result['res']
expected_keys = set(merged_data.keys())