blob: 28996ab3bd14cdd8d9fe3f59f143733afd1ac1d5 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import re
2import Queue
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08004import traceback
5import threading
koder aka kdanilov97644f92015-02-13 11:11:08 -08006from concurrent.futures import ThreadPoolExecutor
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08007
8from utils import ssh_connect
9
10import itest
koder aka kdanilov3f356262015-02-13 08:06:14 -080011from utils import get_barrier, log_error, wait_on_barrier
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080012
koder aka kdanilove21d7472015-02-14 19:02:04 -080013
14logger = logging.getLogger("io-perf-tool")
15
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080016conn_uri_attrs = ("user", "passwd", "host", "port", "path")
17
18
19class ConnCreds(object):
20 def __init__(self):
21 for name in conn_uri_attrs:
22 setattr(self, name, None)
23
24
25uri_reg_exprs = []
26
27
28class URIsNamespace(object):
29 class ReParts(object):
30 user_rr = "[^:]*?"
31 host_rr = "[^:]*?"
32 port_rr = "\\d+"
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080033 key_file_rr = "[^:@]*"
34 passwd_rr = ".*?"
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080035
36 re_dct = ReParts.__dict__
37
38 for attr_name, val in re_dct.items():
39 if attr_name.endswith('_rr'):
40 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
41 setattr(ReParts, attr_name, new_rr)
42
43 re_dct = ReParts.__dict__
44
45 templs = [
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080046 "^{host_rr}$",
47 "^{user_rr}@{host_rr}::{key_file_rr}$",
48 "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
49 "^{user_rr}:{passwd_rr}@@{host_rr}$",
50 "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080051 ]
52
53 for templ in templs:
54 uri_reg_exprs.append(templ.format(**re_dct))
55
56
57def parse_ssh_uri(uri):
koder aka kdanilovdda86d32015-03-16 11:20:04 +020058 # user:passwd@@ip_host:port
59 # user:passwd@@ip_host
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080060 # user@ip_host:port
61 # user@ip_host
62 # ip_host:port
63 # ip_host
64 # user@ip_host:port:path_to_key_file
65 # user@ip_host::path_to_key_file
66 # ip_host:port:path_to_key_file
67 # ip_host::path_to_key_file
68
69 res = ConnCreds()
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080070 res.port = "22"
koder aka kdanilovdda86d32015-03-16 11:20:04 +020071 res.key_file = None
72 res.passwd = None
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080073
74 for rr in uri_reg_exprs:
75 rrm = re.match(rr, uri)
76 if rrm is not None:
77 res.__dict__.update(rrm.groupdict())
78 return res
79 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
80
81
82def connect(uri):
83 creds = parse_ssh_uri(uri)
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080084 creds.port = int(creds.port)
85 return ssh_connect(creds)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080086
87
88def conn_func(obj, barrier, latest_start_time, conn):
89 try:
90 test_iter = itest.run_test_iter(obj, conn)
91 next(test_iter)
92
koder aka kdanilov3f356262015-02-13 08:06:14 -080093 wait_on_barrier(barrier, latest_start_time)
94
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080095 with log_error("!Run test"):
96 return next(test_iter)
97 except:
98 print traceback.format_exc()
99 raise
100
101
102def get_ssh_runner(uris,
103 latest_start_time=None,
104 keep_temp_files=False):
koder aka kdanilove21d7472015-02-14 19:02:04 -0800105 logger.debug("Connecting to servers")
106
koder aka kdanilov97644f92015-02-13 11:11:08 -0800107 with ThreadPoolExecutor(max_workers=16) as executor:
koder aka kdanilove21d7472015-02-14 19:02:04 -0800108 connections = list(executor.map(connect, uris))
koder aka kdanilov97644f92015-02-13 11:11:08 -0800109
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800110 result_queue = Queue.Queue()
111 barrier = get_barrier(len(uris), threaded=True)
112
113 def closure(obj):
114 ths = []
115 obj.set_result_cb(result_queue.put)
116
117 params = (obj, barrier, latest_start_time)
118
koder aka kdanilove21d7472015-02-14 19:02:04 -0800119 logger.debug("Start tests")
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800120 for conn in connections:
121 th = threading.Thread(None, conn_func, None,
122 params + (conn,))
123 th.daemon = True
124 th.start()
125 ths.append(th)
126
127 for th in ths:
128 th.join()
129
130 test_result = []
131 while not result_queue.empty():
132 test_result.append(result_queue.get())
133
koder aka kdanilove21d7472015-02-14 19:02:04 -0800134 logger.debug("Done. Closing connection")
koder aka kdanilov3f356262015-02-13 08:06:14 -0800135 for conn in connections:
136 conn.close()
137
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800138 return test_result
139
140 return closure