blob: ad3f82e629cfee68b5093885887b01e5ecf270d3 [file] [log] [blame]
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08001import re
2import Queue
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08004import traceback
5import threading
koder aka kdanilov97644f92015-02-13 11:11:08 -08006from concurrent.futures import ThreadPoolExecutor
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -08007
8from utils import ssh_connect
9
10import itest
koder aka kdanilov3f356262015-02-13 08:06:14 -080011from utils import get_barrier, log_error, wait_on_barrier
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080012
koder aka kdanilove21d7472015-02-14 19:02:04 -080013
14logger = logging.getLogger("io-perf-tool")
15
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080016conn_uri_attrs = ("user", "passwd", "host", "port", "path")
17
18
19class ConnCreds(object):
20 def __init__(self):
21 for name in conn_uri_attrs:
22 setattr(self, name, None)
23
24
25uri_reg_exprs = []
26
27
28class URIsNamespace(object):
29 class ReParts(object):
30 user_rr = "[^:]*?"
31 host_rr = "[^:]*?"
32 port_rr = "\\d+"
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080033 key_file_rr = "[^:@]*"
34 passwd_rr = ".*?"
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080035
36 re_dct = ReParts.__dict__
37
38 for attr_name, val in re_dct.items():
39 if attr_name.endswith('_rr'):
40 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
41 setattr(ReParts, attr_name, new_rr)
42
43 re_dct = ReParts.__dict__
44
45 templs = [
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080046 "^{host_rr}$",
47 "^{user_rr}@{host_rr}::{key_file_rr}$",
48 "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
49 "^{user_rr}:{passwd_rr}@@{host_rr}$",
50 "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080051 ]
52
53 for templ in templs:
54 uri_reg_exprs.append(templ.format(**re_dct))
55
56
57def parse_ssh_uri(uri):
58 # user:passwd@ip_host:port
59 # user:passwd@ip_host
60 # user@ip_host:port
61 # user@ip_host
62 # ip_host:port
63 # ip_host
64 # user@ip_host:port:path_to_key_file
65 # user@ip_host::path_to_key_file
66 # ip_host:port:path_to_key_file
67 # ip_host::path_to_key_file
68
69 res = ConnCreds()
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080070 res.port = "22"
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080071
72 for rr in uri_reg_exprs:
73 rrm = re.match(rr, uri)
74 if rrm is not None:
75 res.__dict__.update(rrm.groupdict())
76 return res
77 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
78
79
80def connect(uri):
81 creds = parse_ssh_uri(uri)
koder aka kdanilov4ec0f712015-02-19 19:19:27 -080082 creds.port = int(creds.port)
83 return ssh_connect(creds)
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080084
85
86def conn_func(obj, barrier, latest_start_time, conn):
87 try:
88 test_iter = itest.run_test_iter(obj, conn)
89 next(test_iter)
90
koder aka kdanilov3f356262015-02-13 08:06:14 -080091 wait_on_barrier(barrier, latest_start_time)
92
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080093 with log_error("!Run test"):
94 return next(test_iter)
95 except:
96 print traceback.format_exc()
97 raise
98
99
100def get_ssh_runner(uris,
101 latest_start_time=None,
102 keep_temp_files=False):
103
koder aka kdanilove21d7472015-02-14 19:02:04 -0800104 logger.debug("Connecting to servers")
105
koder aka kdanilov97644f92015-02-13 11:11:08 -0800106 with ThreadPoolExecutor(max_workers=16) as executor:
koder aka kdanilove21d7472015-02-14 19:02:04 -0800107 connections = list(executor.map(connect, uris))
koder aka kdanilov97644f92015-02-13 11:11:08 -0800108
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800109 result_queue = Queue.Queue()
110 barrier = get_barrier(len(uris), threaded=True)
111
112 def closure(obj):
113 ths = []
114 obj.set_result_cb(result_queue.put)
115
116 params = (obj, barrier, latest_start_time)
117
koder aka kdanilove21d7472015-02-14 19:02:04 -0800118 logger.debug("Start tests")
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800119 for conn in connections:
120 th = threading.Thread(None, conn_func, None,
121 params + (conn,))
122 th.daemon = True
123 th.start()
124 ths.append(th)
125
126 for th in ths:
127 th.join()
128
129 test_result = []
130 while not result_queue.empty():
131 test_result.append(result_queue.get())
132
koder aka kdanilove21d7472015-02-14 19:02:04 -0800133 logger.debug("Done. Closing connection")
koder aka kdanilov3f356262015-02-13 08:06:14 -0800134 for conn in connections:
135 conn.close()
136
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -0800137 return test_result
138
139 return closure