blob: 78a8df68bd694f814f49726b9ceed61831d515b6 [file] [log] [blame]
import time
import logging
import threading
import contextlib
import multiprocessing
import paramiko
logger = logging.getLogger("io-perf-tool")
def get_barrier(count, threaded=False):
if threaded:
class val(object):
value = count
cond = threading.Condition()
else:
val = multiprocessing.Value('i', count)
cond = multiprocessing.Condition()
def closure(timeout):
with cond:
val.value -= 1
if val.value == 0:
cond.notify_all()
else:
cond.wait(timeout)
return val.value == 0
return closure
def ssh_connect(host, user, key_file, retry_count=60, timeout=1):
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.known_hosts = None
for i in range(retry_count):
try:
ssh.connect(host, username=user, key_filename=key_file,
look_for_keys=False)
return ssh
except:
if i == retry_count - 1:
raise
time.sleep(timeout)
def wait_on_barrier(barrier, latest_start_time):
if barrier is not None:
if latest_start_time is not None:
timeout = latest_start_time - time.time()
else:
timeout = None
if timeout is not None and timeout > 0:
msg = "Ready and waiting on barrier. " + \
"Will wait at most {0} seconds"
logger.debug(msg.format(int(timeout)))
if not barrier(timeout):
logger.debug("Barrier timeouted")
@contextlib.contextmanager
def log_error(action, types=(Exception,)):
if not action.startswith("!"):
logger.debug("Starts : " + action)
else:
action = action[1:]
try:
yield
except Exception as exc:
if isinstance(exc, types) and not isinstance(exc, StopIteration):
templ = "Error during {0} stage: {1}"
logger.debug(templ.format(action, exc.message))
raise
def run_over_ssh(conn, cmd):
"should be replaces by normal implementation, with select"
stdin, stdout, stderr = conn.exec_command(cmd)
out = stdout.read()
err = stderr.read()
code = stdout.channel.recv_exit_status()
return code, out, err
def ssize_to_kb(ssize):
try:
smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
for ext, coef in smap.items():
if ssize.endswith(ext):
return int(ssize[:-1]) * coef
if int(ssize) % 1024 != 0:
raise ValueError()
return int(ssize) / 1024
except (ValueError, TypeError, AttributeError):
tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
raise ValueError(tmpl.format(ssize))