blob: c7ba9a9fe7081f4d6ce1cddc1614f3c343b0cd1a [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import re
2import Queue
3import traceback
4import threading
koder aka kdanilov97644f92015-02-13 11:11:08 -08005from concurrent.futures import ThreadPoolExecutor
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08006
7from utils import ssh_connect
8
9import itest
koder aka kdanilov3f356262015-02-13 08:06:14 -080010from utils import get_barrier, log_error, wait_on_barrier
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080011
12conn_uri_attrs = ("user", "passwd", "host", "port", "path")
13
14
15class ConnCreds(object):
16 def __init__(self):
17 for name in conn_uri_attrs:
18 setattr(self, name, None)
19
20
21uri_reg_exprs = []
22
23
24class URIsNamespace(object):
25 class ReParts(object):
26 user_rr = "[^:]*?"
27 host_rr = "[^:]*?"
28 port_rr = "\\d+"
29 key_file_rr = ".*"
30
31 re_dct = ReParts.__dict__
32
33 for attr_name, val in re_dct.items():
34 if attr_name.endswith('_rr'):
35 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
36 setattr(ReParts, attr_name, new_rr)
37
38 re_dct = ReParts.__dict__
39
40 templs = [
41 "^{user_rr}@{host_rr}::{key_file_rr}$"
42 ]
43
44 for templ in templs:
45 uri_reg_exprs.append(templ.format(**re_dct))
46
47
48def parse_ssh_uri(uri):
49 # user:passwd@ip_host:port
50 # user:passwd@ip_host
51 # user@ip_host:port
52 # user@ip_host
53 # ip_host:port
54 # ip_host
55 # user@ip_host:port:path_to_key_file
56 # user@ip_host::path_to_key_file
57 # ip_host:port:path_to_key_file
58 # ip_host::path_to_key_file
59
60 res = ConnCreds()
61
62 for rr in uri_reg_exprs:
63 rrm = re.match(rr, uri)
64 if rrm is not None:
65 res.__dict__.update(rrm.groupdict())
66 return res
67 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
68
69
70def connect(uri):
71 creds = parse_ssh_uri(uri)
72 return ssh_connect(creds.host, creds.user, creds.key_file)
73
74
75def conn_func(obj, barrier, latest_start_time, conn):
76 try:
77 test_iter = itest.run_test_iter(obj, conn)
78 next(test_iter)
79
koder aka kdanilov3f356262015-02-13 08:06:14 -080080 wait_on_barrier(barrier, latest_start_time)
81
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080082 with log_error("!Run test"):
83 return next(test_iter)
84 except:
85 print traceback.format_exc()
86 raise
87
88
89def get_ssh_runner(uris,
90 latest_start_time=None,
91 keep_temp_files=False):
92
koder aka kdanilov97644f92015-02-13 11:11:08 -080093 with ThreadPoolExecutor(max_workers=16) as executor:
94 connections = executor.map(connect, uris)
95
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080096 result_queue = Queue.Queue()
97 barrier = get_barrier(len(uris), threaded=True)
98
99 def closure(obj):
100 ths = []
101 obj.set_result_cb(result_queue.put)
102
103 params = (obj, barrier, latest_start_time)
104
105 for conn in connections:
106 th = threading.Thread(None, conn_func, None,
107 params + (conn,))
108 th.daemon = True
109 th.start()
110 ths.append(th)
111
112 for th in ths:
113 th.join()
114
115 test_result = []
116 while not result_queue.empty():
117 test_result.append(result_queue.get())
118
koder aka kdanilov3f356262015-02-13 08:06:14 -0800119 for conn in connections:
120 conn.close()
121
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800122 return test_result
123
124 return closure