koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 1 | import time |
| 2 | import threading |
| 3 | import contextlib |
koder aka kdanilov | 4643fd6 | 2015-02-10 16:20:13 -0800 | [diff] [blame] | 4 | import multiprocessing |
| 5 | |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 6 | import paramiko |
koder aka kdanilov | 4643fd6 | 2015-02-10 16:20:13 -0800 | [diff] [blame] | 7 | |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 8 | from log import log |
| 9 | |
| 10 | |
| 11 | def get_barrier(count, threaded=False): |
| 12 | if threaded: |
| 13 | class val(object): |
| 14 | value = count |
| 15 | cond = threading.Condition() |
| 16 | else: |
| 17 | val = multiprocessing.Value('i', count) |
| 18 | cond = multiprocessing.Condition() |
koder aka kdanilov | 4643fd6 | 2015-02-10 16:20:13 -0800 | [diff] [blame] | 19 | |
| 20 | def closure(timeout): |
| 21 | with cond: |
| 22 | val.value -= 1 |
| 23 | if val.value == 0: |
| 24 | cond.notify_all() |
| 25 | else: |
| 26 | cond.wait(timeout) |
| 27 | return val.value == 0 |
| 28 | |
| 29 | return closure |
koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 30 | |
| 31 | |
| 32 | def ssh_connect(host, user, key_file, retry_count=10, timeout=5): |
| 33 | ssh = paramiko.SSHClient() |
| 34 | ssh.load_host_keys('/dev/null') |
| 35 | ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
| 36 | ssh.known_hosts = None |
| 37 | |
| 38 | for i in range(retry_count): |
| 39 | try: |
| 40 | ssh.connect(host, username=user, key_filename=key_file, |
| 41 | look_for_keys=False) |
| 42 | return ssh |
| 43 | except: |
| 44 | if i == retry_count - 1: |
| 45 | raise |
| 46 | time.sleep(timeout) |
| 47 | |
| 48 | |
| 49 | def wait_on_barrier(barrier, latest_start_time): |
| 50 | if barrier is not None: |
| 51 | if latest_start_time is not None: |
| 52 | timeout = latest_start_time - time.time() |
| 53 | else: |
| 54 | timeout = None |
| 55 | |
| 56 | if timeout is not None and timeout > 0: |
| 57 | msg = "Ready and waiting on barrier. " + \ |
| 58 | "Will wait at most {0} seconds" |
| 59 | log(msg.format(int(timeout))) |
| 60 | |
| 61 | if not barrier(timeout): |
| 62 | log("Barrier timeouted") |
| 63 | |
| 64 | |
| 65 | @contextlib.contextmanager |
| 66 | def log_error(action, types=(Exception,)): |
| 67 | if not action.startswith("!"): |
| 68 | log("Starts : " + action) |
| 69 | else: |
| 70 | action = action[1:] |
| 71 | |
| 72 | try: |
| 73 | yield |
| 74 | except Exception as exc: |
| 75 | if isinstance(exc, types) and not isinstance(exc, StopIteration): |
| 76 | templ = "Error during {0} stage: {1}" |
| 77 | log(templ.format(action, exc.message)) |
| 78 | raise |
| 79 | |
| 80 | |
| 81 | def run_over_ssh(conn, cmd): |
| 82 | "should be replaces by normal implementation, with select" |
| 83 | |
| 84 | stdin, stdout, stderr = conn.exec_command(cmd) |
| 85 | out = stdout.read() |
| 86 | err = stderr.read() |
| 87 | code = stdout.channel.recv_exit_status() |
| 88 | return code, out, err |