blob: 78a8df68bd694f814f49726b9ceed61831d515b6 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import time
koder aka kdanilove21d7472015-02-14 19:02:04 -08002import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08003import threading
4import contextlib
koder aka kdanilov4643fd62015-02-10 16:20:13 -08005import multiprocessing
6
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08007import paramiko
koder aka kdanilov4643fd62015-02-10 16:20:13 -08008
koder aka kdanilove21d7472015-02-14 19:02:04 -08009
10logger = logging.getLogger("io-perf-tool")
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080011
12
13def get_barrier(count, threaded=False):
14 if threaded:
15 class val(object):
16 value = count
17 cond = threading.Condition()
18 else:
19 val = multiprocessing.Value('i', count)
20 cond = multiprocessing.Condition()
koder aka kdanilov4643fd62015-02-10 16:20:13 -080021
22 def closure(timeout):
23 with cond:
24 val.value -= 1
25 if val.value == 0:
26 cond.notify_all()
27 else:
28 cond.wait(timeout)
29 return val.value == 0
30
31 return closure
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080032
33
koder aka kdanilov3f356262015-02-13 08:06:14 -080034def ssh_connect(host, user, key_file, retry_count=60, timeout=1):
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080035 ssh = paramiko.SSHClient()
36 ssh.load_host_keys('/dev/null')
37 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
38 ssh.known_hosts = None
39
40 for i in range(retry_count):
41 try:
42 ssh.connect(host, username=user, key_filename=key_file,
43 look_for_keys=False)
44 return ssh
45 except:
46 if i == retry_count - 1:
47 raise
48 time.sleep(timeout)
49
50
51def wait_on_barrier(barrier, latest_start_time):
52 if barrier is not None:
53 if latest_start_time is not None:
54 timeout = latest_start_time - time.time()
55 else:
56 timeout = None
57
58 if timeout is not None and timeout > 0:
59 msg = "Ready and waiting on barrier. " + \
60 "Will wait at most {0} seconds"
koder aka kdanilove21d7472015-02-14 19:02:04 -080061 logger.debug(msg.format(int(timeout)))
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080062
63 if not barrier(timeout):
koder aka kdanilove21d7472015-02-14 19:02:04 -080064 logger.debug("Barrier timeouted")
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080065
66
67@contextlib.contextmanager
68def log_error(action, types=(Exception,)):
69 if not action.startswith("!"):
koder aka kdanilove21d7472015-02-14 19:02:04 -080070 logger.debug("Starts : " + action)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080071 else:
72 action = action[1:]
73
74 try:
75 yield
76 except Exception as exc:
77 if isinstance(exc, types) and not isinstance(exc, StopIteration):
78 templ = "Error during {0} stage: {1}"
koder aka kdanilove21d7472015-02-14 19:02:04 -080079 logger.debug(templ.format(action, exc.message))
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080080 raise
81
82
83def run_over_ssh(conn, cmd):
84 "should be replaces by normal implementation, with select"
85
86 stdin, stdout, stderr = conn.exec_command(cmd)
87 out = stdout.read()
88 err = stderr.read()
89 code = stdout.channel.recv_exit_status()
90 return code, out, err
koder aka kdanilov83cd7132015-02-14 21:37:14 -080091
92
93def ssize_to_kb(ssize):
94 try:
95 smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
96 for ext, coef in smap.items():
97 if ssize.endswith(ext):
98 return int(ssize[:-1]) * coef
99
100 if int(ssize) % 1024 != 0:
101 raise ValueError()
102
103 return int(ssize) / 1024
104
105 except (ValueError, TypeError, AttributeError):
106 tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
107 raise ValueError(tmpl.format(ssize))