run tests on nodes in offline mode
diff --git a/configs/perf1.yaml b/configs/perf1.yaml
index 119cdb1..9428b78 100644
--- a/configs/perf1.yaml
+++ b/configs/perf1.yaml
@@ -3,7 +3,7 @@
url: http://172.16.52.112:8000/
creds: admin:admin@admin
ssh_creds: root:test37
- openstack_env: test
+ openstack_env: Performance-1
discover: fuel
@@ -24,7 +24,7 @@
- start_test_nodes:
creds: clouds
vm_params:
- count: x2
+ count: x1
image:
name: disk_io_perf
url: https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
@@ -47,10 +47,9 @@
tests:
- io:
- cluster: true
- cfg: wally/suits/io/io_scenario_ceph.cfg
+ # cfg: wally/suits/io/io_scenario_ceph.cfg
+ cfg: scripts/fio_tests_configs/io_task_test.cfg
prefill_files: true
- # cfg: scripts/fio_tests_configs/io_task_test.cfg
params:
FILENAME: /dev/vdb
NUM_ROUNDS: 3
diff --git a/configs/perf2.yaml b/configs/perf2.yaml
new file mode 100644
index 0000000..fac9da6
--- /dev/null
+++ b/configs/perf2.yaml
@@ -0,0 +1,47 @@
+clouds:
+ fuel:
+ url: http://172.16.52.113:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:test37
+ openstack_env: rrr
+
+discover: fuel
+
+internal:
+ var_dir_root: /tmp/perf_tests
+
+tests:
+ - start_test_nodes:
+ creds: clouds
+ vm_params:
+ count: x1
+ image:
+ name: disk_io_perf
+ url: https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+ flavor:
+ name: disk_io_perf.1024
+ hdd_size: 50
+ ram_size: 1024
+ cpu: 1
+
+ vol_sz: 30
+ keypair_name: disk_io_perf
+ network_zone_name: net04
+ flt_ip_pool: net04_ext
+ private_key_path: disk_io_perf.pem
+ creds: "ssh://ubuntu@{ip}::{private_key_path}"
+ name_templ: wally-{group}-{id}
+ scheduler_group_name: wally-{group}-{id}
+ security_group: disk_io_perf
+
+ tests:
+ - io:
+ cfg: wally/suits/io/ceph.cfg
+ prefill_files: true
+ params:
+ FILENAME: /dev/vdb
+ NUM_ROUNDS: 1
+
+logging:
+ extra_logs: 1
\ No newline at end of file
diff --git a/configs/usb_hdd.yaml b/configs/usb_hdd.yaml
index a6e52ed..4ad0d97 100644
--- a/configs/usb_hdd.yaml
+++ b/configs/usb_hdd.yaml
@@ -1,20 +1,23 @@
explicit_nodes:
- local: testnode
+ # local: testnode
+ "ssh://koder@localhost::/home/koder/.ssh/id_rsa": testnode
internal:
var_dir_root: /tmp/perf_tests
-sensors:
- receiver_url: "udp://{ip}:5699"
- roles_mapping:
- ceph-osd: block-io
- cinder: block-io, system-cpu
- testnode: system-cpu, block-io
+# sensors:
+# receiver_url: "udp://{ip}:5699"
+# roles_mapping:
+# ceph-osd: block-io
+# cinder: block-io, system-cpu
+# testnode: system-cpu, block-io
tests:
- io:
- cfg: wally/suits/io/io_scenario_hdd.cfg
+ # cfg: wally/suits/io/io_scenario_hdd.cfg
+ cfg: scripts/fio_tests_configs/io_task_test.cfg
prefill_files: false
+ use_sudo: false
params:
FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
NUM_ROUNDS: 5
diff --git a/requirements-web.txt b/requirements-web.txt
index 55fba09..83a13ad 100644
--- a/requirements-web.txt
+++ b/requirements-web.txt
@@ -1,18 +1,15 @@
+argparse==1.2.1
Babel==1.3
-Flask==0.10.1
+decorator==3.4.0
Flask-Bootstrap==3.3.0.1
Flask-SQLAlchemy==2.0
-GChartWrapper==0.8
-Jinja2==2.7.3
-MarkupSafe==0.23
-SQLAlchemy==0.9.8
-Tempita==0.5.2
-Werkzeug==0.10.1
-argparse==1.2.1
-decorator==3.4.0
+Flask==0.10.1
futures==2.2.0
+GChartWrapper==0.8
iso8601==0.1.10
itsdangerous==0.24
+Jinja2==2.7.3
+MarkupSafe==0.23
netaddr==0.7.13
oslo.config==1.6.1
oslo.i18n==1.4.0
@@ -30,7 +27,10 @@
simplejson==3.6.5
six==1.9.0
sqlalchemy-migrate==0.9.4
+SQLAlchemy==0.9.8
sqlparse==0.1.14
stevedore==1.2.0
+Tempita==0.5.2
warlock==1.1.0
+Werkzeug==0.10.1
wsgiref==0.1.2
diff --git a/scripts/fio_tests_configs/io_task_test.cfg b/scripts/fio_tests_configs/io_task_test.cfg
index 4d68493..bfb2d96 100644
--- a/scripts/fio_tests_configs/io_task_test.cfg
+++ b/scripts/fio_tests_configs/io_task_test.cfg
@@ -5,7 +5,7 @@
filename={FILENAME}
buffered=0
iodepth=1
-size=1000Mb
+size=1000m
time_based
RUNTIME=10
diff --git a/wally/run_test.py b/wally/run_test.py
index d578349..d4b33bf 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,7 +2,6 @@
import os
import sys
-import time
import Queue
import pprint
import logging
@@ -78,23 +77,26 @@
def test_thread(test, node, barrier, res_q):
+ exc = None
try:
logger.debug("Run preparation for {0}".format(node.get_conn_id()))
- test.pre_run(node.connection)
+ test.pre_run()
logger.debug("Run test for {0}".format(node.get_conn_id()))
- test.run(node.connection, barrier)
+ test.run(barrier)
except Exception as exc:
logger.exception("In test {0} for node {1}".format(test, node))
- res_q.put(exc)
try:
- test.cleanup(node.connection)
+ test.cleanup()
except:
msg = "Duringf cleanup - in test {0} for node {1}"
logger.exception(msg.format(test, node))
+ if exc is not None:
+ res_q.put(exc)
-def run_tests(test_block, nodes):
+
+def run_tests(test_block, nodes, test_uuid):
tool_type_mapper = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
@@ -124,8 +126,8 @@
if not os.path.exists(dr):
os.makedirs(dr)
- test = tool_type_mapper[name](params, res_q.put, dr,
- node=node.get_conn_id())
+ test = tool_type_mapper[name](params, res_q.put, test_uuid, node,
+ log_directory=dr)
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
@@ -313,10 +315,13 @@
if not cfg['no_tests']:
for test_group in config.get('tests', []):
- ctx.results.extend(run_tests(test_group, ctx.nodes))
+ test_res = run_tests(test_group, ctx.nodes,
+ cfg['run_uuid'])
+ ctx.results.extend(test_res)
else:
if not cfg['no_tests']:
- ctx.results.extend(run_tests(group, ctx.nodes))
+ test_res = run_tests(group, ctx.nodes, cfg['run_uuid'])
+ ctx.results.extend(test_res)
def shut_down_vms_stage(cfg, ctx):
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
index a4fa157..bc4ab81 100644
--- a/wally/sensors/daemonize.py
+++ b/wally/sensors/daemonize.py
@@ -1,15 +1,14 @@
# #!/usr/bin/python
-import fcntl
import os
import pwd
import grp
import sys
+import fcntl
import signal
-import resource
-import logging
import atexit
-from logging import handlers
+import logging
+import resource
class Daemonize(object):
@@ -155,7 +154,7 @@
# actually have such capabilities
# on the machine we are running this.
if os.path.isfile(syslog_address):
- syslog = handlers.SysLogHandler(syslog_address)
+ syslog = logging.handlers.SysLogHandler(syslog_address)
if self.verbose:
syslog.setLevel(logging.DEBUG)
else:
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index f4de3b6..d3d6547 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -28,6 +28,9 @@
@classmethod
def put(cls, localfile, remfile):
+ dirname = os.path.dirname(remfile)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
shutil.copyfile(localfile, remfile)
@classmethod
@@ -105,6 +108,16 @@
time.sleep(1)
+def save_to_remote(sftp, path, content):
+ with sftp.open(path, "wb") as fd:
+ fd.write(content)
+
+
+def read_from_remote(sftp, path):
+ with sftp.open(path, "rb") as fd:
+ return fd.read()
+
+
def normalize_dirpath(dirpath):
while dirpath.endswith("/"):
dirpath = dirpath[:-1]
@@ -119,7 +132,7 @@
if intermediate:
try:
sftp.mkdir(remotepath, mode=mode)
- except IOError:
+ except (IOError, OSError):
upper_dir = remotepath.rsplit("/", 1)[0]
if upper_dir == '' or upper_dir == '/':
@@ -265,6 +278,9 @@
# ip_host:port:path_to_key_file
# ip_host::path_to_key_file
+ if uri.startswith("ssh://"):
+ uri = uri[len("ssh://"):]
+
res = ConnCreds()
res.port = "22"
res.key_file = None
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index b11f20e..25910b4 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -1,9 +1,11 @@
+import os
import sys
import time
import json
import copy
import select
import pprint
+import os.path
import argparse
import traceback
import subprocess
@@ -516,7 +518,7 @@
help="Start execution at START_AT_UTC")
parser.add_argument("--json", action="store_true", default=False,
help="Json output format")
- parser.add_argument("--output", default='-', metavar="FILE_PATH",
+ parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
help="Store results to FILE_PATH")
parser.add_argument("--estimate", action="store_true", default=False,
help="Only estimate task execution time")
@@ -533,6 +535,9 @@
default=[],
help="Provide set of pairs PARAM=VAL to" +
"format into job description")
+ parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
+ default=None, help="Store pid to FILE_TO_STORE_PID " +
+ "and remove this file on exit")
parser.add_argument("jobfile")
return parser.parse_args(argv)
@@ -550,65 +555,78 @@
else:
out_fd = open(argv_obj.output, "w")
- params = {}
- for param_val in argv_obj.params:
- assert '=' in param_val
- name, val = param_val.split("=", 1)
- params[name] = val
+ if argv_obj.pid_file is not None:
+ with open(argv_obj.pid_file, "w") as fd:
+ fd.write(str(os.getpid()))
- slice_params = {
- 'runcycle': argv_obj.runcycle,
- }
+ try:
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = val
- sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
- if argv_obj.estimate:
- it = map(calculate_execution_time, sliced_it)
- print sec_to_str(sum(it))
- return 0
+ sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
- if argv_obj.num_tests or argv_obj.compile:
- if argv_obj.compile:
- for test_slice in sliced_it:
- out_fd.write(fio_config_to_str(test_slice))
- out_fd.write("\n#" + "-" * 70 + "\n\n")
+ if argv_obj.estimate:
+ it = map(calculate_execution_time, sliced_it)
+ print sec_to_str(sum(it))
+ return 0
- if argv_obj.num_tests:
- print len(list(sliced_it))
+ if argv_obj.num_tests or argv_obj.compile:
+ if argv_obj.compile:
+ for test_slice in sliced_it:
+ out_fd.write(fio_config_to_str(test_slice))
+ out_fd.write("\n#" + "-" * 70 + "\n\n")
- return 0
+ if argv_obj.num_tests:
+ print len(list(sliced_it))
- if argv_obj.start_at is not None:
- ctime = time.time()
- if argv_obj.start_at >= ctime:
- time.sleep(ctime - argv_obj.start_at)
+ return 0
- def raw_res_func(test_num, data):
- pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
- out_fd.write(pref)
- out_fd.write(json.dumps(data))
- out_fd.write("\n========= END OF RAW_RESULTS =========\n")
- out_fd.flush()
+ if argv_obj.start_at is not None:
+ ctime = time.time()
+ if argv_obj.start_at >= ctime:
+ time.sleep(ctime - argv_obj.start_at)
- rrfunc = raw_res_func if argv_obj.show_raw_results else None
+ def raw_res_func(test_num, data):
+ pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+ out_fd.write(pref)
+ out_fd.write(json.dumps(data))
+ out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+ out_fd.flush()
- stime = time.time()
- job_res, num_tests, ok = run_benchmark(argv_obj.type, sliced_it, rrfunc)
- etime = time.time()
+ rrfunc = raw_res_func if argv_obj.show_raw_results else None
- res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
+ stime = time.time()
+ job_res, num_tests, ok = run_benchmark(argv_obj.type,
+ sliced_it, rrfunc)
+ etime = time.time()
- oformat = 'json' if argv_obj.json else 'eval'
- out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
- int(etime - stime)))
- out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
- if argv_obj.json:
- out_fd.write(json.dumps(res))
- else:
- out_fd.write(pprint.pformat(res) + "\n")
- out_fd.write("\n========= END OF RESULTS =========\n")
+ res = {'__meta__': {'raw_cfg': job_cfg, 'params': params},
+ 'res': job_res}
- return 0 if ok else 1
+ oformat = 'json' if argv_obj.json else 'eval'
+ msg = "\nRun {0} tests in {1} seconds\n"
+ out_fd.write(msg.format(num_tests, int(etime - stime)))
+
+ msg = "========= RESULTS(format={0}) =========\n"
+ out_fd.write(msg.format(oformat))
+ if argv_obj.json:
+ out_fd.write(json.dumps(res))
+ else:
+ out_fd.write(pprint.pformat(res) + "\n")
+ out_fd.write("\n========= END OF RESULTS =========\n")
+
+ return 0 if ok else 1
+ finally:
+ if argv_obj.pid_file is not None:
+ if os.path.exists(argv_obj.pid_file):
+ os.unlink(argv_obj.pid_file)
def fake_main(x):
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index cc8e411..1352b1e 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -9,7 +9,7 @@
NUM_ROUNDS=7
NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-size=5G
+size=30G
ramp_time=30
runtime=60
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0f8afd4..5166fdd 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,12 +1,17 @@
import abc
import time
+import random
import os.path
import logging
import datetime
-from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, ssh_mkdir,
+ # delete_file,
+ connect, read_from_remote, Local)
+
from . import postgres
from .io import agent as io_agent
from .io import formatter as io_formatter
@@ -17,19 +22,20 @@
class IPerfTest(object):
- def __init__(self, on_result_cb, log_directory=None, node=None):
+ def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
self.on_result_cb = on_result_cb
self.log_directory = log_directory
self.node = node
+ self.test_uuid = test_uuid
- def pre_run(self, conn):
+ def pre_run(self):
pass
- def cleanup(self, conn):
+ def cleanup(self):
pass
@abc.abstractmethod
- def run(self, conn, barrier):
+ def run(self, barrier):
pass
@classmethod
@@ -37,12 +43,16 @@
msg = "{0}.format_for_console".format(cls.__name__)
raise NotImplementedError(msg)
+ def run_over_ssh(self, cmd, **kwargs):
+ return run_over_ssh(self.node.connection, cmd,
+ node=self.node.get_conn_id(), **kwargs)
+
class TwoScriptTest(IPerfTest):
remote_tmp_dir = '/tmp'
- def __init__(self, opts, on_result_cb, log_directory=None, node=None):
- IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ def __init__(self, opts, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
self.opts = opts
if 'run_script' in self.opts:
@@ -52,22 +62,23 @@
def get_remote_for_script(self, script):
return os.path.join(self.tmp_dir, script.rpartition('/')[2])
- def copy_script(self, conn, src):
+ def copy_script(self, src):
remote_path = self.get_remote_for_script(src)
- copy_paths(conn, {src: remote_path})
+ copy_paths(self.node.connection, {src: remote_path})
return remote_path
- def pre_run(self, conn):
- remote_script = self.copy_script(conn, self.pre_run_script)
+ def pre_run(self):
+ remote_script = self.copy_script(self.node.connection,
+ self.pre_run_script)
cmd = remote_script
- run_over_ssh(conn, cmd, node=self.node)
+ self.run_over_ssh(cmd)
- def run(self, conn, barrier):
- remote_script = self.copy_script(conn, self.run_script)
+ def run(self, barrier):
+ remote_script = self.copy_script(self.node.connection, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.opts.items()])
cmd = remote_script + ' ' + cmd_opts
- out_err = run_over_ssh(conn, cmd, node=self.node)
+ out_err = self.run_over_ssh(cmd)
self.on_result(out_err, cmd)
def parse_results(self, out):
@@ -91,11 +102,13 @@
class IOPerfTest(IPerfTest):
- io_py_remote = "/tmp/disk_test_agent.py"
+ io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
+ log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
+ pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
+ task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
- def __init__(self, test_options, on_result_cb,
- log_directory=None, node=None):
- IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ def __init__(self, test_options, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
self.options = test_options
self.config_fname = test_options['cfg']
self.alive_check_interval = test_options.get('alive_check_interval')
@@ -108,6 +121,11 @@
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
+ self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
+ self.log_fl = self.log_fl_templ.format(self.test_uuid)
+ self.pid_file = self.pid_file_templ.format(self.test_uuid)
+ self.task_file = self.task_file_templ.format(self.test_uuid)
+
fio_command_file = open_for_append_or_create(cmd_log)
cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
@@ -115,29 +133,35 @@
fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
- def cleanup(self, conn):
- delete_file(conn, self.io_py_remote)
+ def cleanup(self):
+ # delete_file(conn, self.io_py_remote)
# Need to remove tempo files, used for testing
+ pass
- def pre_run(self, conn):
+ def pre_run(self):
+ ssh_mkdir(self.node.connection.open_sftp(),
+ os.path.dirname(self.io_py_remote),
+ intermediate=True)
try:
- run_over_ssh(conn, 'which fio', node=self.node)
+ self.run_over_ssh('which fio')
except OSError:
# TODO: install fio, if not installed
- cmd = "sudo apt-get -y install fio"
-
for i in range(3):
try:
- run_over_ssh(conn, cmd, node=self.node)
+ self.run_over_ssh("sudo apt-get -y install fio")
break
except OSError as err:
time.sleep(3)
else:
raise OSError("Can't install fio - " + err.message)
- local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
- self.files_to_copy = {local_fname: self.io_py_remote}
- copy_paths(conn, self.files_to_copy)
+ local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
+
+ self.files_to_copy = {
+ local_fname: self.io_py_remote,
+ }
+
+ copy_paths(self.node.connection, self.files_to_copy)
if self.options.get('prefill_files', True):
files = {}
@@ -155,17 +179,19 @@
# take largest size
files[fname] = max(files.get(fname, 0), msz)
- # logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ cmd_templ = "dd oflag=direct " + \
+ "if=/dev/zero of={0} bs={1} count={2}"
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
+
ssize = 0
stime = time.time()
for fname, curr_sz in files.items():
cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
ssize += curr_sz
- run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+ self.run_over_ssh(cmd, timeout=curr_sz)
ddtime = time.time() - stime
if ddtime > 1E-3:
@@ -175,9 +201,13 @@
else:
logger.warning("Test files prefill disabled")
- def run(self, conn, barrier):
- cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
- # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+ def run(self, barrier):
+ cmd_templ = "screen -S {screen_name} -d -m " + \
+ "env python2 {0} -p {pid_file} -o {results_file} " + \
+ "--type {1} {2} --json {3}"
+
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
@@ -185,14 +215,24 @@
if "" != params:
params = "--params " + params
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ save_to_remote(self.node.connection.open_sftp(),
+ self.task_file, self.raw_cfg)
+
+ screen_name = self.test_uuid
+ cmd = cmd_templ.format(self.io_py_remote,
+ self.tool,
+ params,
+ self.task_file,
+ pid_file=self.pid_file,
+ results_file=self.log_fl,
+ screen_name=screen_name)
logger.debug("Waiting on barrier")
exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try:
- timeout = int(exec_time * 1.2 + 300)
+ timeout = int(exec_time * 2 + 300)
if barrier.wait():
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
@@ -205,11 +245,67 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
- out_err = run_over_ssh(conn, cmd,
- stdin_data=self.raw_cfg,
- timeout=timeout,
- node=self.node)
- logger.info("Done")
+ self.run_over_ssh(cmd)
+ logger.debug("Test started in screen {0}".format(screen_name))
+
+ end_of_wait_time = timeout + time.time()
+
+ # time_till_check = random.randint(30, 90)
+ time_till_check = 1
+
+ pid = None
+ no_pid_file = True
+ tcp_conn_timeout = 30
+ pid_get_timeout = 30 + time.time()
+
+ # TODO: add monitoring socket
+ if self.node.connection is not Local:
+ self.node.connection.close()
+
+ while end_of_wait_time > time.time():
+ conn = None
+ time.sleep(time_till_check)
+
+ try:
+ if self.node.connection is not Local:
+ conn = connect(self.node.conn_url,
+ conn_timeout=tcp_conn_timeout)
+ else:
+ conn = self.node.connection
+ except:
+ logging.exception("During connect")
+ continue
+
+ try:
+ pid = read_from_remote(conn.open_sftp(), self.pid_file)
+ no_pid_file = False
+ except (NameError, IOError):
+ no_pid_file = True
+
+ if conn is not Local:
+ conn.close()
+
+ if no_pid_file:
+ if pid is None:
+ if time.time() > pid_get_timeout:
+ msg = "On node {0} pid file doesn't " + \
+ "appears in time"
+ logging.error(msg.format(self.node.get_conn_id()))
+ raise RuntimeError("Start timeout")
+ else:
+ # execution finished
+ break
+
+ logger.debug("Done")
+
+ if self.node.connection is not Local:
+ timeout = tcp_conn_timeout * 3
+ self.node.connection = connect(self.node.conn_url,
+ conn_timeout=timeout)
+
+ # try to reboot and then connect
+ out_err = read_from_remote(self.node.connection.open_sftp(),
+ self.log_fl)
finally:
barrier.exit()