large commit. new code, with sensors, line count dropped, etc
diff --git a/utils.py b/utils.py
index 059101a..4a784ec 100644
--- a/utils.py
+++ b/utils.py
@@ -1,10 +1,8 @@
import time
import socket
-import os.path
import logging
import threading
import contextlib
-import multiprocessing
logger = logging.getLogger("io-perf-tool")
@@ -22,43 +20,32 @@
return user, passwd, host
-def get_barrier(count, threaded=False):
- if threaded:
- class val(object):
- value = count
- cond = threading.Condition()
- else:
- val = multiprocessing.Value('i', count)
- cond = multiprocessing.Condition()
+class TaksFinished(Exception):
+ pass
- def closure(timeout):
- with cond:
- val.value -= 1
- if val.value == 0:
- cond.notify_all()
+
+class Barrier(object):
+ def __init__(self, count):
+ self.count = count
+ self.curr_count = 0
+ self.cond = threading.Condition()
+ self.exited = False
+
+ def wait(self, timeout=None):
+ with self.cond:
+ if self.exited:
+ raise TaksFinished()
+
+ self.curr_count += 1
+ if self.curr_count == self.count:
+ self.curr_count = 0
+ self.cond.notify_all()
else:
- cond.wait(timeout)
- return val.value == 0
+ self.cond.wait(timeout=timeout)
- return closure
-
-
-def wait_on_barrier(barrier, latest_start_time):
- if barrier is not None:
- if latest_start_time is not None:
- timeout = latest_start_time - time.time()
- else:
- timeout = None
-
- if timeout is not None and timeout > 0:
- msg = "Ready and waiting on barrier. " + \
- "Will wait at most {0} seconds"
- logger.debug(msg.format(int(timeout)))
-
- if not barrier(timeout):
- logger.debug("Barrier timeouted")
- else:
- logger.debug("Passing barrier, starting test")
+ def exit(self):
+ with self.cond:
+ self.exited = True
@contextlib.contextmanager
@@ -77,56 +64,51 @@
raise
-def run_over_ssh(conn, cmd):
+def run_over_ssh(conn, cmd, stdin_data=None, exec_timeout=60):
"should be replaces by normal implementation, with select"
-
- stdin, stdout, stderr = conn.exec_command(cmd)
- out = stdout.read()
- err = stderr.read()
- code = stdout.channel.recv_exit_status()
- return code, out, err
-
-
-def kb_to_ssize(ssize):
- size_ext = {
- 4: 'P',
- 3: 'T',
- 2: 'G',
- 1: 'M',
- 0: 'K'
- }
-
- for idx in reversed(sorted(size_ext)):
- if ssize > 1024 ** idx:
- ext = size_ext[idx]
- return "{0}{1}".format(int(ssize / 1024 ** idx), ext)
- raise ValueError("Can't convert {0} to kb".format(ssize))
-
-
-def ssize_to_kb(ssize):
+ transport = conn.get_transport()
+ session = transport.open_session()
try:
- smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
- for ext, coef in smap.items():
- if ssize.endswith(ext):
- return int(ssize[:-1]) * coef
+ session.set_combine_stderr(True)
- if int(ssize) % 1024 != 0:
- raise ValueError()
+ stime = time.time()
+ session.exec_command(cmd)
- return int(ssize) / 1024
+ if stdin_data is not None:
+ session.sendall(stdin_data)
- except (ValueError, TypeError, AttributeError):
- tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
- raise ValueError(tmpl.format(ssize))
+ session.settimeout(1)
+ session.shutdown_write()
+ output = ""
+
+ while True:
+ try:
+ ndata = session.recv(1024)
+ output += ndata
+ if "" == ndata:
+ break
+ except socket.timeout:
+ pass
+ if time.time() - stime > exec_timeout:
+ return 1, output + "\nExecution timeout"
+ code = session.recv_exit_status()
+ finally:
+ session.close()
+
+ return code, output
+
+
+SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
def ssize_to_b(ssize):
try:
- smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
- for ext, coef in smap.items():
- if ssize.endswith(ext):
- return int(ssize[:-1]) * coef * 1024
+ ssize = ssize.lower()
+ if ssize.endswith("b"):
+ ssize = ssize[:-1]
+ if ssize[-1] in SMAP:
+ return int(ssize[:-1]) * SMAP[ssize[-1]]
return int(ssize)
except (ValueError, TypeError, AttributeError):
tmpl = "Unknow size format {0!r} (or size not multiples 1024)"