blob: d5d7acabfae204cf244fe199c829fac769aabeca [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import re
2import Queue
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08004import traceback
5import threading
koder aka kdanilov97644f92015-02-13 11:11:08 -08006from concurrent.futures import ThreadPoolExecutor
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08007
8from utils import ssh_connect
9
10import itest
koder aka kdanilov3f356262015-02-13 08:06:14 -080011from utils import get_barrier, log_error, wait_on_barrier
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080012
koder aka kdanilove21d7472015-02-14 19:02:04 -080013
14logger = logging.getLogger("io-perf-tool")
15
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080016conn_uri_attrs = ("user", "passwd", "host", "port", "path")
17
18
19class ConnCreds(object):
20 def __init__(self):
21 for name in conn_uri_attrs:
22 setattr(self, name, None)
23
24
25uri_reg_exprs = []
26
27
28class URIsNamespace(object):
29 class ReParts(object):
30 user_rr = "[^:]*?"
31 host_rr = "[^:]*?"
32 port_rr = "\\d+"
33 key_file_rr = ".*"
34
35 re_dct = ReParts.__dict__
36
37 for attr_name, val in re_dct.items():
38 if attr_name.endswith('_rr'):
39 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
40 setattr(ReParts, attr_name, new_rr)
41
42 re_dct = ReParts.__dict__
43
44 templs = [
45 "^{user_rr}@{host_rr}::{key_file_rr}$"
46 ]
47
48 for templ in templs:
49 uri_reg_exprs.append(templ.format(**re_dct))
50
51
52def parse_ssh_uri(uri):
53 # user:passwd@ip_host:port
54 # user:passwd@ip_host
55 # user@ip_host:port
56 # user@ip_host
57 # ip_host:port
58 # ip_host
59 # user@ip_host:port:path_to_key_file
60 # user@ip_host::path_to_key_file
61 # ip_host:port:path_to_key_file
62 # ip_host::path_to_key_file
63
64 res = ConnCreds()
65
66 for rr in uri_reg_exprs:
67 rrm = re.match(rr, uri)
68 if rrm is not None:
69 res.__dict__.update(rrm.groupdict())
70 return res
71 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
72
73
74def connect(uri):
75 creds = parse_ssh_uri(uri)
76 return ssh_connect(creds.host, creds.user, creds.key_file)
77
78
79def conn_func(obj, barrier, latest_start_time, conn):
80 try:
81 test_iter = itest.run_test_iter(obj, conn)
82 next(test_iter)
83
koder aka kdanilov3f356262015-02-13 08:06:14 -080084 wait_on_barrier(barrier, latest_start_time)
85
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080086 with log_error("!Run test"):
87 return next(test_iter)
88 except:
89 print traceback.format_exc()
90 raise
91
92
93def get_ssh_runner(uris,
94 latest_start_time=None,
95 keep_temp_files=False):
96
koder aka kdanilove21d7472015-02-14 19:02:04 -080097 logger.debug("Connecting to servers")
98
koder aka kdanilov97644f92015-02-13 11:11:08 -080099 with ThreadPoolExecutor(max_workers=16) as executor:
koder aka kdanilove21d7472015-02-14 19:02:04 -0800100 connections = list(executor.map(connect, uris))
koder aka kdanilov97644f92015-02-13 11:11:08 -0800101
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800102 result_queue = Queue.Queue()
103 barrier = get_barrier(len(uris), threaded=True)
104
105 def closure(obj):
106 ths = []
107 obj.set_result_cb(result_queue.put)
108
109 params = (obj, barrier, latest_start_time)
110
koder aka kdanilove21d7472015-02-14 19:02:04 -0800111 logger.debug("Start tests")
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800112 for conn in connections:
113 th = threading.Thread(None, conn_func, None,
114 params + (conn,))
115 th.daemon = True
116 th.start()
117 ths.append(th)
118
119 for th in ths:
120 th.join()
121
122 test_result = []
123 while not result_queue.empty():
124 test_result.append(result_queue.get())
125
koder aka kdanilove21d7472015-02-14 19:02:04 -0800126 logger.debug("Done. Closing connection")
koder aka kdanilov3f356262015-02-13 08:06:14 -0800127 for conn in connections:
128 conn.close()
129
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800130 return test_result
131
132 return closure