blob: 3415382f957c73f73cccf2adef4c94751d89ce63 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import re
2import Queue
3import traceback
4import threading
5
6from utils import ssh_connect
7
8import itest
koder aka kdanilov3f356262015-02-13 08:06:14 -08009from utils import get_barrier, log_error, wait_on_barrier
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080010
11conn_uri_attrs = ("user", "passwd", "host", "port", "path")
12
13
14class ConnCreds(object):
15 def __init__(self):
16 for name in conn_uri_attrs:
17 setattr(self, name, None)
18
19
20uri_reg_exprs = []
21
22
23class URIsNamespace(object):
24 class ReParts(object):
25 user_rr = "[^:]*?"
26 host_rr = "[^:]*?"
27 port_rr = "\\d+"
28 key_file_rr = ".*"
29
30 re_dct = ReParts.__dict__
31
32 for attr_name, val in re_dct.items():
33 if attr_name.endswith('_rr'):
34 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
35 setattr(ReParts, attr_name, new_rr)
36
37 re_dct = ReParts.__dict__
38
39 templs = [
40 "^{user_rr}@{host_rr}::{key_file_rr}$"
41 ]
42
43 for templ in templs:
44 uri_reg_exprs.append(templ.format(**re_dct))
45
46
47def parse_ssh_uri(uri):
48 # user:passwd@ip_host:port
49 # user:passwd@ip_host
50 # user@ip_host:port
51 # user@ip_host
52 # ip_host:port
53 # ip_host
54 # user@ip_host:port:path_to_key_file
55 # user@ip_host::path_to_key_file
56 # ip_host:port:path_to_key_file
57 # ip_host::path_to_key_file
58
59 res = ConnCreds()
60
61 for rr in uri_reg_exprs:
62 rrm = re.match(rr, uri)
63 if rrm is not None:
64 res.__dict__.update(rrm.groupdict())
65 return res
66 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
67
68
69def connect(uri):
70 creds = parse_ssh_uri(uri)
71 return ssh_connect(creds.host, creds.user, creds.key_file)
72
73
74def conn_func(obj, barrier, latest_start_time, conn):
75 try:
76 test_iter = itest.run_test_iter(obj, conn)
77 next(test_iter)
78
koder aka kdanilov3f356262015-02-13 08:06:14 -080079 wait_on_barrier(barrier, latest_start_time)
80
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080081 with log_error("!Run test"):
82 return next(test_iter)
83 except:
84 print traceback.format_exc()
85 raise
86
87
88def get_ssh_runner(uris,
89 latest_start_time=None,
90 keep_temp_files=False):
91
92 connections = [connect(uri) for uri in uris]
93 result_queue = Queue.Queue()
94 barrier = get_barrier(len(uris), threaded=True)
95
96 def closure(obj):
97 ths = []
98 obj.set_result_cb(result_queue.put)
99
100 params = (obj, barrier, latest_start_time)
101
102 for conn in connections:
103 th = threading.Thread(None, conn_func, None,
104 params + (conn,))
105 th.daemon = True
106 th.start()
107 ths.append(th)
108
109 for th in ths:
110 th.join()
111
112 test_result = []
113 while not result_queue.empty():
114 test_result.append(result_queue.get())
115
koder aka kdanilov3f356262015-02-13 08:06:14 -0800116 for conn in connections:
117 conn.close()
118
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800119 return test_result
120
121 return closure