blob: a7dda3f03d447e859461c4208a85497ec85d6f77 [file] [log] [blame]
koder aka kdanilove06762a2015-03-22 23:32:09 +02001import re
2import Queue
3import logging
4import os.path
5import traceback
6import threading
7
8from concurrent.futures import ThreadPoolExecutor
9
10import itest
11from utils import ssh_connect
12from utils import get_barrier, log_error, wait_on_barrier
13
14logger = logging.getLogger("io-perf-tool")
15conn_uri_attrs = ("user", "passwd", "host", "port", "path")
16
17
18def normalize_dirpath(dirpath):
19 while dirpath.endswith("/"):
20 dirpath = dirpath[:-1]
21 return dirpath
22
23
24def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
25 remotepath = normalize_dirpath(remotepath)
26 if intermediate:
27 try:
28 sftp.mkdir(remotepath, mode=mode)
29 except IOError:
30 ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
31 intermediate=True)
32 return sftp.mkdir(remotepath, mode=mode)
33 else:
34 sftp.mkdir(remotepath, mode=mode)
35
36
37def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
38 sftp.put(localfile, remfile)
39 if preserve_perm:
40 sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
41
42
43def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
44 "upload local directory to remote recursively"
45
46 # hack for localhost connection
47 if hasattr(sftp, "copytree"):
48 sftp.copytree(localpath, remotepath)
49 return
50
51 assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
52
53 # normalize
54 localpath = normalize_dirpath(localpath)
55 remotepath = normalize_dirpath(remotepath)
56
57 try:
58 sftp.chdir(remotepath)
59 localsuffix = localpath.rsplit("/", 1)[1]
60 remotesuffix = remotepath.rsplit("/", 1)[1]
61 if localsuffix != remotesuffix:
62 remotepath = os.path.join(remotepath, localsuffix)
63 except IOError:
64 pass
65
66 for root, dirs, fls in os.walk(localpath):
67 prefix = os.path.commonprefix([localpath, root])
68 suffix = root.split(prefix, 1)[1]
69 if suffix.startswith("/"):
70 suffix = suffix[1:]
71
72 remroot = os.path.join(remotepath, suffix)
73
74 try:
75 sftp.chdir(remroot)
76 except IOError:
77 if preserve_perm:
78 mode = os.stat(root).st_mode & 0777
79 else:
80 mode = 0777
81 ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
82 sftp.chdir(remroot)
83
84 for f in fls:
85 remfile = os.path.join(remroot, f)
86 localfile = os.path.join(root, f)
87 ssh_copy_file(sftp, localfile, remfile, preserve_perm)
88
89
90def copy_paths(conn, paths):
91 sftp = conn.open_sftp()
92 try:
93 for src, dst in paths.items():
94 try:
95 if os.path.isfile(src):
96 ssh_copy_file(sftp, src, dst)
97 elif os.path.isdir(src):
98 put_dir_recursively(sftp, src, dst)
99 else:
100 templ = "Can't copy {0!r} - " + \
101 "it neither a file not a directory"
102 msg = templ.format(src)
103 raise OSError(msg)
104 except Exception as exc:
105 tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
106 msg = tmpl.format(src, dst, exc)
107 raise OSError(msg)
108 finally:
109 sftp.close()
110
111
112class ConnCreds(object):
113 def __init__(self):
114 for name in conn_uri_attrs:
115 setattr(self, name, None)
116
117
118uri_reg_exprs = []
119
120
121class URIsNamespace(object):
122 class ReParts(object):
123 user_rr = "[^:]*?"
124 host_rr = "[^:]*?"
125 port_rr = "\\d+"
126 key_file_rr = "[^:@]*"
127 passwd_rr = ".*?"
128
129 re_dct = ReParts.__dict__
130
131 for attr_name, val in re_dct.items():
132 if attr_name.endswith('_rr'):
133 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
134 setattr(ReParts, attr_name, new_rr)
135
136 re_dct = ReParts.__dict__
137
138 templs = [
139 "^{host_rr}$",
140 "^{user_rr}@{host_rr}::{key_file_rr}$",
141 "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
142 "^{user_rr}:{passwd_rr}@@{host_rr}$",
143 "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
144 ]
145
146 for templ in templs:
147 uri_reg_exprs.append(templ.format(**re_dct))
148
149
150def parse_ssh_uri(uri):
151 # user:passwd@@ip_host:port
152 # user:passwd@@ip_host
153 # user@ip_host:port
154 # user@ip_host
155 # ip_host:port
156 # ip_host
157 # user@ip_host:port:path_to_key_file
158 # user@ip_host::path_to_key_file
159 # ip_host:port:path_to_key_file
160 # ip_host::path_to_key_file
161
162 res = ConnCreds()
163 res.port = "22"
164 res.key_file = None
165 res.passwd = None
166
167 for rr in uri_reg_exprs:
168 rrm = re.match(rr, uri)
169 if rrm is not None:
170 res.__dict__.update(rrm.groupdict())
171 return res
172 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
173
174
175def connect(uri):
176 creds = parse_ssh_uri(uri)
177 creds.port = int(creds.port)
178 return ssh_connect(creds)
179
180
181def conn_func(obj, barrier, latest_start_time, conn):
182 try:
183 test_iter = itest.run_test_iter(obj, conn)
184 next(test_iter)
185
186 wait_on_barrier(barrier, latest_start_time)
187
188 with log_error("!Run test"):
189 return next(test_iter)
190 except:
191 print traceback.format_exc()
192 raise
193
194
195def get_ssh_runner(uris,
196 latest_start_time=None,
197 keep_temp_files=False):
198 logger.debug("Connecting to servers")
199
200 with ThreadPoolExecutor(max_workers=16) as executor:
201 connections = list(executor.map(connect, uris))
202
203 result_queue = Queue.Queue()
204 barrier = get_barrier(len(uris), threaded=True)
205
206 def closure(obj):
207 ths = []
208 obj.set_result_cb(result_queue.put)
209
210 params = (obj, barrier, latest_start_time)
211
212 logger.debug("Start tests")
213 for conn in connections:
214 th = threading.Thread(None, conn_func, None,
215 params + (conn,))
216 th.daemon = True
217 th.start()
218 ths.append(th)
219
220 for th in ths:
221 th.join()
222
223 test_result = []
224 while not result_queue.empty():
225 test_result.append(result_queue.get())
226
227 logger.debug("Done. Closing connection")
228 for conn in connections:
229 conn.close()
230
231 return test_result
232
233 return closure