blob: 76fc6b55588ce2e1fd6ebdf0f3c44a8e7ca092b0 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import re
2import Queue
3import traceback
4import threading
5
6from utils import ssh_connect
7
8import itest
9from utils import get_barrier, log_error
10
11conn_uri_attrs = ("user", "passwd", "host", "port", "path")
12
13
14class ConnCreds(object):
15 def __init__(self):
16 for name in conn_uri_attrs:
17 setattr(self, name, None)
18
19
20uri_reg_exprs = []
21
22
23class URIsNamespace(object):
24 class ReParts(object):
25 user_rr = "[^:]*?"
26 host_rr = "[^:]*?"
27 port_rr = "\\d+"
28 key_file_rr = ".*"
29
30 re_dct = ReParts.__dict__
31
32 for attr_name, val in re_dct.items():
33 if attr_name.endswith('_rr'):
34 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
35 setattr(ReParts, attr_name, new_rr)
36
37 re_dct = ReParts.__dict__
38
39 templs = [
40 "^{user_rr}@{host_rr}::{key_file_rr}$"
41 ]
42
43 for templ in templs:
44 uri_reg_exprs.append(templ.format(**re_dct))
45
46
47def parse_ssh_uri(uri):
48 # user:passwd@ip_host:port
49 # user:passwd@ip_host
50 # user@ip_host:port
51 # user@ip_host
52 # ip_host:port
53 # ip_host
54 # user@ip_host:port:path_to_key_file
55 # user@ip_host::path_to_key_file
56 # ip_host:port:path_to_key_file
57 # ip_host::path_to_key_file
58
59 res = ConnCreds()
60
61 for rr in uri_reg_exprs:
62 rrm = re.match(rr, uri)
63 if rrm is not None:
64 res.__dict__.update(rrm.groupdict())
65 return res
66 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
67
68
69def connect(uri):
70 creds = parse_ssh_uri(uri)
71 return ssh_connect(creds.host, creds.user, creds.key_file)
72
73
74def conn_func(obj, barrier, latest_start_time, conn):
75 try:
76 test_iter = itest.run_test_iter(obj, conn)
77 next(test_iter)
78
79 with log_error("!Run test"):
80 return next(test_iter)
81 except:
82 print traceback.format_exc()
83 raise
84
85
86def get_ssh_runner(uris,
87 latest_start_time=None,
88 keep_temp_files=False):
89
90 connections = [connect(uri) for uri in uris]
91 result_queue = Queue.Queue()
92 barrier = get_barrier(len(uris), threaded=True)
93
94 def closure(obj):
95 ths = []
96 obj.set_result_cb(result_queue.put)
97
98 params = (obj, barrier, latest_start_time)
99
100 for conn in connections:
101 th = threading.Thread(None, conn_func, None,
102 params + (conn,))
103 th.daemon = True
104 th.start()
105 ths.append(th)
106
107 for th in ths:
108 th.join()
109
110 test_result = []
111 while not result_queue.empty():
112 test_result.append(result_queue.get())
113
114 return test_result
115
116 return closure