blob: 0a38ad52c7c2a9a1a36c4e5b15feec40ac2163b3 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import time
2import threading
3import contextlib
koder aka kdanilov4643fd62015-02-10 16:20:13 -08004import multiprocessing
5
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08006import paramiko
koder aka kdanilov4643fd62015-02-10 16:20:13 -08007
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08008from log import log
9
10
11def get_barrier(count, threaded=False):
12 if threaded:
13 class val(object):
14 value = count
15 cond = threading.Condition()
16 else:
17 val = multiprocessing.Value('i', count)
18 cond = multiprocessing.Condition()
koder aka kdanilov4643fd62015-02-10 16:20:13 -080019
20 def closure(timeout):
21 with cond:
22 val.value -= 1
23 if val.value == 0:
24 cond.notify_all()
25 else:
26 cond.wait(timeout)
27 return val.value == 0
28
29 return closure
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080030
31
32def ssh_connect(host, user, key_file, retry_count=10, timeout=5):
33 ssh = paramiko.SSHClient()
34 ssh.load_host_keys('/dev/null')
35 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
36 ssh.known_hosts = None
37
38 for i in range(retry_count):
39 try:
40 ssh.connect(host, username=user, key_filename=key_file,
41 look_for_keys=False)
42 return ssh
43 except:
44 if i == retry_count - 1:
45 raise
46 time.sleep(timeout)
47
48
49def wait_on_barrier(barrier, latest_start_time):
50 if barrier is not None:
51 if latest_start_time is not None:
52 timeout = latest_start_time - time.time()
53 else:
54 timeout = None
55
56 if timeout is not None and timeout > 0:
57 msg = "Ready and waiting on barrier. " + \
58 "Will wait at most {0} seconds"
59 log(msg.format(int(timeout)))
60
61 if not barrier(timeout):
62 log("Barrier timeouted")
63
64
65@contextlib.contextmanager
66def log_error(action, types=(Exception,)):
67 if not action.startswith("!"):
68 log("Starts : " + action)
69 else:
70 action = action[1:]
71
72 try:
73 yield
74 except Exception as exc:
75 if isinstance(exc, types) and not isinstance(exc, StopIteration):
76 templ = "Error during {0} stage: {1}"
77 log(templ.format(action, exc.message))
78 raise
79
80
81def run_over_ssh(conn, cmd):
82 "should be replaces by normal implementation, with select"
83
84 stdin, stdout, stderr = conn.exec_command(cmd)
85 out = stdout.read()
86 err = stderr.read()
87 code = stdout.channel.recv_exit_status()
88 return code, out, err