| koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 1 | import time | 
|  | 2 | import threading | 
|  | 3 | import contextlib | 
| koder aka kdanilov | 4643fd6 | 2015-02-10 16:20:13 -0800 | [diff] [blame] | 4 | import multiprocessing | 
|  | 5 |  | 
| koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 6 | import paramiko | 
| koder aka kdanilov | 4643fd6 | 2015-02-10 16:20:13 -0800 | [diff] [blame] | 7 |  | 
| koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 8 | from log import log | 
|  | 9 |  | 
|  | 10 |  | 
|  | 11 | def get_barrier(count, threaded=False): | 
|  | 12 | if threaded: | 
|  | 13 | class val(object): | 
|  | 14 | value = count | 
|  | 15 | cond = threading.Condition() | 
|  | 16 | else: | 
|  | 17 | val = multiprocessing.Value('i', count) | 
|  | 18 | cond = multiprocessing.Condition() | 
| koder aka kdanilov | 4643fd6 | 2015-02-10 16:20:13 -0800 | [diff] [blame] | 19 |  | 
|  | 20 | def closure(timeout): | 
|  | 21 | with cond: | 
|  | 22 | val.value -= 1 | 
|  | 23 | if val.value == 0: | 
|  | 24 | cond.notify_all() | 
|  | 25 | else: | 
|  | 26 | cond.wait(timeout) | 
|  | 27 | return val.value == 0 | 
|  | 28 |  | 
|  | 29 | return closure | 
| koder aka kdanilov | 7acd6bd | 2015-02-12 14:28:30 -0800 | [diff] [blame] | 30 |  | 
|  | 31 |  | 
|  | 32 | def ssh_connect(host, user, key_file, retry_count=10, timeout=5): | 
|  | 33 | ssh = paramiko.SSHClient() | 
|  | 34 | ssh.load_host_keys('/dev/null') | 
|  | 35 | ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | 
|  | 36 | ssh.known_hosts = None | 
|  | 37 |  | 
|  | 38 | for i in range(retry_count): | 
|  | 39 | try: | 
|  | 40 | ssh.connect(host, username=user, key_filename=key_file, | 
|  | 41 | look_for_keys=False) | 
|  | 42 | return ssh | 
|  | 43 | except: | 
|  | 44 | if i == retry_count - 1: | 
|  | 45 | raise | 
|  | 46 | time.sleep(timeout) | 
|  | 47 |  | 
|  | 48 |  | 
|  | 49 | def wait_on_barrier(barrier, latest_start_time): | 
|  | 50 | if barrier is not None: | 
|  | 51 | if latest_start_time is not None: | 
|  | 52 | timeout = latest_start_time - time.time() | 
|  | 53 | else: | 
|  | 54 | timeout = None | 
|  | 55 |  | 
|  | 56 | if timeout is not None and timeout > 0: | 
|  | 57 | msg = "Ready and waiting on barrier. " + \ | 
|  | 58 | "Will wait at most {0} seconds" | 
|  | 59 | log(msg.format(int(timeout))) | 
|  | 60 |  | 
|  | 61 | if not barrier(timeout): | 
|  | 62 | log("Barrier timeouted") | 
|  | 63 |  | 
|  | 64 |  | 
|  | 65 | @contextlib.contextmanager | 
|  | 66 | def log_error(action, types=(Exception,)): | 
|  | 67 | if not action.startswith("!"): | 
|  | 68 | log("Starts : " + action) | 
|  | 69 | else: | 
|  | 70 | action = action[1:] | 
|  | 71 |  | 
|  | 72 | try: | 
|  | 73 | yield | 
|  | 74 | except Exception as exc: | 
|  | 75 | if isinstance(exc, types) and not isinstance(exc, StopIteration): | 
|  | 76 | templ = "Error during {0} stage: {1}" | 
|  | 77 | log(templ.format(action, exc.message)) | 
|  | 78 | raise | 
|  | 79 |  | 
|  | 80 |  | 
|  | 81 | def run_over_ssh(conn, cmd): | 
|  | 82 | "should be replaces by normal implementation, with select" | 
|  | 83 |  | 
|  | 84 | stdin, stdout, stderr = conn.exec_command(cmd) | 
|  | 85 | out = stdout.read() | 
|  | 86 | err = stderr.read() | 
|  | 87 | code = stdout.channel.recv_exit_status() | 
|  | 88 | return code, out, err |