large refactoring of new code
diff --git a/ b/
new file mode 100644
index 0000000..a7dda3f
--- /dev/null
+++ b/
@@ -0,0 +1,233 @@
+import re
+import Queue
+import logging
+import os.path
+import traceback
+import threading
+from concurrent.futures import ThreadPoolExecutor
+import itest
+from utils import ssh_connect
+from utils import get_barrier, log_error, wait_on_barrier
+logger = logging.getLogger("io-perf-tool")
+conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+def normalize_dirpath(dirpath):
+ while dirpath.endswith("/"):
+ dirpath = dirpath[:-1]
+ return dirpath
+def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
+ remotepath = normalize_dirpath(remotepath)
+ if intermediate:
+ try:
+ sftp.mkdir(remotepath, mode=mode)
+ except IOError:
+ ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
+ intermediate=True)
+ return sftp.mkdir(remotepath, mode=mode)
+ else:
+ sftp.mkdir(remotepath, mode=mode)
+def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
+ sftp.put(localfile, remfile)
+ if preserve_perm:
+ sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
+def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
+ "upload local directory to remote recursively"
+ # hack for localhost connection
+ if hasattr(sftp, "copytree"):
+ sftp.copytree(localpath, remotepath)
+ return
+ assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
+ # normalize
+ localpath = normalize_dirpath(localpath)
+ remotepath = normalize_dirpath(remotepath)
+ try:
+ sftp.chdir(remotepath)
+ localsuffix = localpath.rsplit("/", 1)[1]
+ remotesuffix = remotepath.rsplit("/", 1)[1]
+ if localsuffix != remotesuffix:
+ remotepath = os.path.join(remotepath, localsuffix)
+ except IOError:
+ pass
+ for root, dirs, fls in os.walk(localpath):
+ prefix = os.path.commonprefix([localpath, root])
+ suffix = root.split(prefix, 1)[1]
+ if suffix.startswith("/"):
+ suffix = suffix[1:]
+ remroot = os.path.join(remotepath, suffix)
+ try:
+ sftp.chdir(remroot)
+ except IOError:
+ if preserve_perm:
+ mode = os.stat(root).st_mode & 0777
+ else:
+ mode = 0777
+ ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
+ sftp.chdir(remroot)
+ for f in fls:
+ remfile = os.path.join(remroot, f)
+ localfile = os.path.join(root, f)
+ ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+def copy_paths(conn, paths):
+ sftp = conn.open_sftp()
+ try:
+ for src, dst in paths.items():
+ try:
+ if os.path.isfile(src):
+ ssh_copy_file(sftp, src, dst)
+ elif os.path.isdir(src):
+ put_dir_recursively(sftp, src, dst)
+ else:
+ templ = "Can't copy {0!r} - " + \
+ "it neither a file not a directory"
+ msg = templ.format(src)
+ raise OSError(msg)
+ except Exception as exc:
+ tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+ msg = tmpl.format(src, dst, exc)
+ raise OSError(msg)
+ finally:
+ sftp.close()
+class ConnCreds(object):
+ def __init__(self):
+ for name in conn_uri_attrs:
+ setattr(self, name, None)
+uri_reg_exprs = []
+class URIsNamespace(object):
+ class ReParts(object):
+ user_rr = "[^:]*?"
+ host_rr = "[^:]*?"
+ port_rr = "\\d+"
+ key_file_rr = "[^:@]*"
+ passwd_rr = ".*?"
+ re_dct = ReParts.__dict__
+ for attr_name, val in re_dct.items():
+ if attr_name.endswith('_rr'):
+ new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+ setattr(ReParts, attr_name, new_rr)
+ re_dct = ReParts.__dict__
+ templs = [
+ "^{host_rr}$",
+ "^{user_rr}@{host_rr}::{key_file_rr}$",
+ "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
+ ]
+ for templ in templs:
+ uri_reg_exprs.append(templ.format(**re_dct))
+def parse_ssh_uri(uri):
+ # user:passwd@@ip_host:port
+ # user:passwd@@ip_host
+ # user@ip_host:port
+ # user@ip_host
+ # ip_host:port
+ # ip_host
+ # user@ip_host:port:path_to_key_file
+ # user@ip_host::path_to_key_file
+ # ip_host:port:path_to_key_file
+ # ip_host::path_to_key_file
+ res = ConnCreds()
+ res.port = "22"
+ res.key_file = None
+ res.passwd = None
+ for rr in uri_reg_exprs:
+ rrm = re.match(rr, uri)
+ if rrm is not None:
+ res.__dict__.update(rrm.groupdict())
+ return res
+ raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+def connect(uri):
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ return ssh_connect(creds)
+def conn_func(obj, barrier, latest_start_time, conn):
+ try:
+ test_iter = itest.run_test_iter(obj, conn)
+ next(test_iter)
+ wait_on_barrier(barrier, latest_start_time)
+ with log_error("!Run test"):
+ return next(test_iter)
+ except:
+ print traceback.format_exc()
+ raise
+def get_ssh_runner(uris,
+ latest_start_time=None,
+ keep_temp_files=False):
+ logger.debug("Connecting to servers")
+ with ThreadPoolExecutor(max_workers=16) as executor:
+ connections = list(, uris))
+ result_queue = Queue.Queue()
+ barrier = get_barrier(len(uris), threaded=True)
+ def closure(obj):
+ ths = []
+ obj.set_result_cb(result_queue.put)
+ params = (obj, barrier, latest_start_time)
+ logger.debug("Start tests")
+ for conn in connections:
+ th = threading.Thread(None, conn_func, None,
+ params + (conn,))
+ th.daemon = True
+ th.start()
+ ths.append(th)
+ for th in ths:
+ th.join()
+ test_result = []
+ while not result_queue.empty():
+ test_result.append(result_queue.get())
+ logger.debug("Done. Closing connection")
+ for conn in connections:
+ conn.close()
+ return test_result
+ return closure