blob: 7c859cfb996f54ec1ee9fa252c54ccb62fca96b2 [file] [log] [blame]
koder aka kdanilove06762a2015-03-22 23:32:09 +02001import re
koder aka kdanilov3a6633e2015-03-26 18:20:00 +02002import time
koder aka kdanilove06762a2015-03-22 23:32:09 +02003import Queue
4import logging
5import os.path
koder aka kdanilov3a6633e2015-03-26 18:20:00 +02006import getpass
koder aka kdanilove06762a2015-03-22 23:32:09 +02007import threading
8
koder aka kdanilov3a6633e2015-03-26 18:20:00 +02009import socket
10import paramiko
koder aka kdanilove06762a2015-03-22 23:32:09 +020011from concurrent.futures import ThreadPoolExecutor
12
koder aka kdanilov3a6633e2015-03-26 18:20:00 +020013from utils import get_barrier
koder aka kdanilove06762a2015-03-22 23:32:09 +020014
15logger = logging.getLogger("io-perf-tool")
16conn_uri_attrs = ("user", "passwd", "host", "port", "path")
17
18
koder aka kdanilov3a6633e2015-03-26 18:20:00 +020019def ssh_connect(creds, retry_count=60, timeout=1):
20 ssh = paramiko.SSHClient()
21 ssh.load_host_keys('/dev/null')
22 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
23 ssh.known_hosts = None
24 for i in range(retry_count):
25 try:
26 if creds.user is None:
27 user = getpass.getuser()
28 else:
29 user = creds.user
30
31 if creds.passwd is not None:
32 ssh.connect(creds.host,
33 username=user,
34 password=creds.passwd,
35 port=creds.port,
36 allow_agent=False,
37 look_for_keys=False)
38 return ssh
39
40 if creds.key_file is not None:
41 ssh.connect(creds.host,
42 username=user,
43 key_filename=creds.key_file,
44 look_for_keys=False,
45 port=creds.port)
46 return ssh
47
48 key_file = os.path.expanduser('~/.ssh/id_rsa')
49 ssh.connect(creds.host,
50 username=user,
51 key_filename=key_file,
52 look_for_keys=False,
53 port=creds.port)
54 return ssh
55 # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
56 except paramiko.PasswordRequiredException:
57 raise
58 except socket.error:
59 if i == retry_count - 1:
60 raise
61 time.sleep(timeout)
62
63
koder aka kdanilove06762a2015-03-22 23:32:09 +020064def normalize_dirpath(dirpath):
65 while dirpath.endswith("/"):
66 dirpath = dirpath[:-1]
67 return dirpath
68
69
70def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
71 remotepath = normalize_dirpath(remotepath)
72 if intermediate:
73 try:
74 sftp.mkdir(remotepath, mode=mode)
75 except IOError:
76 ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
77 intermediate=True)
78 return sftp.mkdir(remotepath, mode=mode)
79 else:
80 sftp.mkdir(remotepath, mode=mode)
81
82
83def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
84 sftp.put(localfile, remfile)
85 if preserve_perm:
86 sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
87
88
89def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
90 "upload local directory to remote recursively"
91
92 # hack for localhost connection
93 if hasattr(sftp, "copytree"):
94 sftp.copytree(localpath, remotepath)
95 return
96
97 assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
98
99 # normalize
100 localpath = normalize_dirpath(localpath)
101 remotepath = normalize_dirpath(remotepath)
102
103 try:
104 sftp.chdir(remotepath)
105 localsuffix = localpath.rsplit("/", 1)[1]
106 remotesuffix = remotepath.rsplit("/", 1)[1]
107 if localsuffix != remotesuffix:
108 remotepath = os.path.join(remotepath, localsuffix)
109 except IOError:
110 pass
111
112 for root, dirs, fls in os.walk(localpath):
113 prefix = os.path.commonprefix([localpath, root])
114 suffix = root.split(prefix, 1)[1]
115 if suffix.startswith("/"):
116 suffix = suffix[1:]
117
118 remroot = os.path.join(remotepath, suffix)
119
120 try:
121 sftp.chdir(remroot)
122 except IOError:
123 if preserve_perm:
124 mode = os.stat(root).st_mode & 0777
125 else:
126 mode = 0777
127 ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
128 sftp.chdir(remroot)
129
130 for f in fls:
131 remfile = os.path.join(remroot, f)
132 localfile = os.path.join(root, f)
133 ssh_copy_file(sftp, localfile, remfile, preserve_perm)
134
135
136def copy_paths(conn, paths):
137 sftp = conn.open_sftp()
138 try:
139 for src, dst in paths.items():
140 try:
141 if os.path.isfile(src):
142 ssh_copy_file(sftp, src, dst)
143 elif os.path.isdir(src):
144 put_dir_recursively(sftp, src, dst)
145 else:
146 templ = "Can't copy {0!r} - " + \
147 "it neither a file not a directory"
148 msg = templ.format(src)
149 raise OSError(msg)
150 except Exception as exc:
151 tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
152 msg = tmpl.format(src, dst, exc)
153 raise OSError(msg)
154 finally:
155 sftp.close()
156
157
158class ConnCreds(object):
159 def __init__(self):
160 for name in conn_uri_attrs:
161 setattr(self, name, None)
162
163
164uri_reg_exprs = []
165
166
167class URIsNamespace(object):
168 class ReParts(object):
169 user_rr = "[^:]*?"
170 host_rr = "[^:]*?"
171 port_rr = "\\d+"
172 key_file_rr = "[^:@]*"
173 passwd_rr = ".*?"
174
175 re_dct = ReParts.__dict__
176
177 for attr_name, val in re_dct.items():
178 if attr_name.endswith('_rr'):
179 new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
180 setattr(ReParts, attr_name, new_rr)
181
182 re_dct = ReParts.__dict__
183
184 templs = [
185 "^{host_rr}$",
186 "^{user_rr}@{host_rr}::{key_file_rr}$",
187 "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
188 "^{user_rr}:{passwd_rr}@@{host_rr}$",
189 "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
190 ]
191
192 for templ in templs:
193 uri_reg_exprs.append(templ.format(**re_dct))
194
195
196def parse_ssh_uri(uri):
197 # user:passwd@@ip_host:port
198 # user:passwd@@ip_host
199 # user@ip_host:port
200 # user@ip_host
201 # ip_host:port
202 # ip_host
203 # user@ip_host:port:path_to_key_file
204 # user@ip_host::path_to_key_file
205 # ip_host:port:path_to_key_file
206 # ip_host::path_to_key_file
207
208 res = ConnCreds()
209 res.port = "22"
210 res.key_file = None
211 res.passwd = None
212
213 for rr in uri_reg_exprs:
214 rrm = re.match(rr, uri)
215 if rrm is not None:
216 res.__dict__.update(rrm.groupdict())
217 return res
218 raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
219
220
221def connect(uri):
222 creds = parse_ssh_uri(uri)
223 creds.port = int(creds.port)
224 return ssh_connect(creds)
225
226
koder aka kdanilove06762a2015-03-22 23:32:09 +0200227def get_ssh_runner(uris,
koder aka kdanilov3a6633e2015-03-26 18:20:00 +0200228 conn_func,
koder aka kdanilove06762a2015-03-22 23:32:09 +0200229 latest_start_time=None,
230 keep_temp_files=False):
231 logger.debug("Connecting to servers")
232
233 with ThreadPoolExecutor(max_workers=16) as executor:
234 connections = list(executor.map(connect, uris))
235
236 result_queue = Queue.Queue()
237 barrier = get_barrier(len(uris), threaded=True)
238
239 def closure(obj):
240 ths = []
241 obj.set_result_cb(result_queue.put)
242
243 params = (obj, barrier, latest_start_time)
244
245 logger.debug("Start tests")
246 for conn in connections:
247 th = threading.Thread(None, conn_func, None,
248 params + (conn,))
249 th.daemon = True
250 th.start()
251 ths.append(th)
252
253 for th in ths:
254 th.join()
255
256 test_result = []
257 while not result_queue.empty():
258 test_result.append(result_queue.get())
259
260 logger.debug("Done. Closing connection")
261 for conn in connections:
262 conn.close()
263
264 return test_result
265
266 return closure