tempo commit
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1cbf88c..7564722 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,104 +1,56 @@
 import abc
+import time
+import logging
 import os.path
 import functools
 
+from concurrent.futures import ThreadPoolExecutor
 
+from wally.utils import Barrier, StopTestError
+from wally.statistic import data_property
 from wally.ssh_utils import run_over_ssh, copy_paths
 
 
-def cached_prop(func):
-    @property
-    @functools.wraps(func)
-    def closure(self):
-        val = getattr(self, "_" + func.__name__)
-        if val is NoData:
-            val = func(self)
-            setattr(self, "_" + func.__name__, val)
-        return val
-    return closure
+logger = logging.getLogger("wally")
 
 
-class NoData(object):
-    pass
+class TestConfig(object):
+    """
+    this class describe test input configuration
 
-
-class VMThData(object):
-    "store set of values for VM_COUNT * TH_COUNT"
-
-
-class IOTestResult(object):
-    def __init__(self):
-        self.run_config = None
-        self.suite_config = None
-        self.run_interval = None
-
-        self.bw = None
-        self.lat = None
-        self.iops = None
-        self.slat = None
-        self.clat = None
-
-        self.fio_section = None
-
-        self._lat_log = NoData
-        self._iops_log = NoData
-        self._bw_log = NoData
-
-        self._sensors_data = NoData
-        self._raw_resuls = NoData
-
-    def to_jsonable(self):
-        pass
-
-    @property
-    def thread_count(self):
-        pass
-
-    @property
-    def sync_mode(self):
-        pass
-
-    @property
-    def abbrev_name(self):
-        pass
-
-    @property
-    def full_name(self):
-        pass
-
-    @cached_prop
-    def lat_log(self):
-        pass
-
-    @cached_prop
-    def iops_log(self):
-        pass
-
-    @cached_prop
-    def bw_log(self):
-        pass
-
-    @cached_prop
-    def sensors_data(self):
-        pass
-
-    @cached_prop
-    def raw_resuls(self):
-        pass
+    test_type:str - test type name
+    params:{str:Any} - parameters from yaml file for this test
+    test_uuid:str - UUID to be used to create filenames and Co
+    log_directory:str - local directory to store results
+    nodes:[Node] - node to run tests on
+    remote_dir:str - directory on nodes to be used for local files
+    """
+    def __init__(self, test_type, params, test_uuid, nodes,
+                 log_directory, remote_dir):
+        self.test_type = test_type
+        self.params = params
+        self.test_uuid = test_uuid
+        self.log_directory = log_directory
+        self.nodes = nodes
+        self.remote_dir = remote_dir
 
 
 class TestResults(object):
-    def __init__(self, config, params, results,
-                 raw_result, run_interval, vm_count,
-                 test_name, **attrs):
+    """
+    this class describe test results
+
+    config:TestConfig - test config object
+    params:dict - parameters from yaml file for this test
+    results:{str:MeasurementMesh} - test results object
+    raw_result:Any - opaque object to store raw results
+    run_interval:(float, float) - test tun time, used for sensors
+    """
+    def __init__(self, config, results, raw_result, run_interval):
         self.config = config
-        self.params = params
+        self.params = config.params
         self.results = results
         self.raw_result = raw_result
         self.run_interval = run_interval
-        self.vm_count = vm_count
-        self.test_name = test_name
-        self.__dict__.update(attrs)
 
     def __str__(self):
         res = "{0}({1}):\n    results:\n".format(
@@ -124,100 +76,209 @@
         pass
 
 
-class IPerfTest(object):
-    def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
-                 total_nodes_count,
-                 log_directory=None,
-                 coordination_queue=None,
-                 remote_dir="/tmp/wally"):
-        self.options = options
-        self.on_result_cb = on_result_cb
-        self.log_directory = log_directory
-        self.node = node
-        self.test_uuid = test_uuid
-        self.coordination_queue = coordination_queue
-        self.remote_dir = remote_dir
-        self.is_primary = is_primary
+class MeasurementMatrix(object):
+    """
+    data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+    """
+    def __init__(self, data):
+        self.data = data
+
+    def per_vm(self):
+        return self.data
+
+    def per_th(self):
+        return sum(self.data, [])
+
+
+class MeasurementResults(object):
+    def stat(self):
+        return data_property(self.data)
+
+    def __str__(self):
+        return 'TS([' + ", ".join(map(str, self.data)) + '])'
+
+
+class SimpleVals(MeasurementResults):
+    """
+    data:[float] - list of values
+    """
+    def __init__(self, data):
+        self.data = data
+
+
+class TimeSeriesValue(MeasurementResults):
+    """
+    values:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+    """
+    def __init__(self, data):
+        assert len(data) > 0
+        data = [(0, 0)] + data
+
+        self.values = []
+        for (cstart, cval), (nstart, nval) in zip(data[:-1], data[1:]):
+            self.values.append((cstart, nstart - cstart, nval))
+
+    @property
+    def values(self):
+        return [val[2] for val in self.data]
+
+    def skip(self, seconds):
+        nres = []
+        for start, ln, val in enumerate(self.data):
+            if start + ln < seconds:
+                continue
+            elif start > seconds:
+                nres.append([start + ln - seconds, val])
+            else:
+                nres.append([0, val])
+        return self.__class__(nres)
+
+    def derived(self, tdelta):
+        end = tdelta
+        res = [[end, 0.0]]
+        tdelta = float(tdelta)
+
+        for start, lenght, val in self.data:
+            if start < end:
+                ln = min(end, start + lenght) - start
+                res[-1][1] += val * ln / tdelta
+
+            if end <= start + lenght:
+                end += tdelta
+                res.append([end, 0.0])
+                while end < start + lenght:
+                    res[-1][1] = val
+                    res.append([end, 0.0])
+                    end += tdelta
+
+        if res[-1][1] < 1:
+            res = res[:-1]
+
+        return self.__class__(res)
+
+
+class PerfTest(object):
+    """
+    Very base class for tests
+    config:TestConfig - test configuration
+    stop_requested:bool - stop for test requested
+    """
+    def __init__(self, config):
+        self.config = config
         self.stop_requested = False
-        self.total_nodes_count = total_nodes_count
 
     def request_stop(self):
         self.stop_requested = True
 
     def join_remote(self, path):
-        return os.path.join(self.remote_dir, path)
-
-    def coordinate(self, data):
-        if self.coordination_queue is not None:
-            self.coordination_queue.put((self.node.get_conn_id(), data))
-
-    def pre_run(self):
-        pass
-
-    def cleanup(self):
-        pass
+        return os.path.join(self.config.remote_dir, path)
 
     @classmethod
     @abc.abstractmethod
-    def load(cls, data):
+    def load(cls, path):
         pass
 
     @abc.abstractmethod
-    def run(self, barrier):
+    def run(self):
         pass
 
-    @classmethod
+    @abc.abstractmethod
     def format_for_console(cls, data):
-        msg = "{0}.format_for_console".format(cls.__name__)
-        raise NotImplementedError(msg)
-
-    def run_over_ssh(self, cmd, **kwargs):
-        return run_over_ssh(self.node.connection, cmd,
-                            node=self.node.get_conn_id(), **kwargs)
-
-    @classmethod
-    def coordination_th(cls, coord_q, barrier, num_threads):
         pass
 
 
-class TwoScriptTest(IPerfTest):
-    def __init__(self, *dt, **mp):
-        IPerfTest.__init__(self, *dt, **mp)
+def run_on_node(node):
+    def closure(*args, **kwargs):
+        return run_over_ssh(node.connection,
+                            *args,
+                            node=node.get_conn_id(),
+                            **kwargs)
+    return closure
 
-        if 'scripts_path' in self.options:
-            self.root = self.options['scripts_path']
-            self.run_script = self.options['run_script']
-            self.prerun_script = self.options['prerun_script']
+
+class ThreadedTest(PerfTest):
+    """
+    Base class for tests, which spawn separated thread for each node
+    """
+
+    def run(self):
+        barrier = Barrier(len(self.nodes))
+        th_test_func = functools.partial(self.th_test_func, barrier)
+
+        with ThreadPoolExecutor(len(self.nodes)) as pool:
+            return list(pool.map(th_test_func, self.config.nodes))
+
+    @abc.abstractmethod
+    def do_test(self, node):
+        pass
+
+    def th_test_func(self, barrier, node):
+        logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
+                                                            node.conn_url))
+
+        logger.debug("Run preparation for {0}".format(node.get_conn_id()))
+        self.pre_run(node)
+        barrier.wait()
+        try:
+            logger.debug("Run test for {0}".format(node.get_conn_id()))
+            return self.do_test(node)
+        except StopTestError as exc:
+            pass
+        except Exception as exc:
+            msg = "In test {0} for node {1}".format(self, node.get_conn_id())
+            logger.exception(msg)
+            exc = StopTestError(msg, exc)
+
+        try:
+            self.cleanup()
+        except StopTestError as exc1:
+            if exc is None:
+                exc = exc1
+        except Exception as exc1:
+            if exc is None:
+                msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
+                logger.exception(msg)
+                exc = StopTestError(msg, exc)
+
+        if exc is not None:
+            raise exc
+
+    def pre_run(self, node):
+        pass
+
+    def cleanup(self, node):
+        pass
+
+
+class TwoScriptTest(ThreadedTest):
+    def __init__(self, *dt, **mp):
+        ThreadedTest.__init__(self, *dt, **mp)
+
+        self.prerun_script = self.config.params['prerun_script']
+        self.run_script = self.config.params['run_script']
+
+        self.prerun_tout = self.config.params.get('prerun_tout', 3600)
+        self.run_tout = self.config.params.get('run_tout', 3600)
 
     def get_remote_for_script(self, script):
-        return os.path.join(self.remote_dir, script.rpartition('/')[2])
+        return os.path.join(self.options.remote_dir,
+                            os.path.basename(script))
 
-    def pre_run(self):
-        copy_paths(self.node.connection, {self.root: self.remote_dir})
+    def pre_run(self, node):
+        copy_paths(node.connection,
+                   {
+                       self.run_script: self.get_remote_for_script(self.run_script),
+                       self.prerun_script: self.get_remote_for_script(self.prerun_script),
+                   })
+
         cmd = self.get_remote_for_script(self.pre_run_script)
-        self.run_over_ssh(cmd, timeout=2000)
+        cmd += ' ' + self.config.params.get('prerun_opts', '')
+        run_on_node(node)(cmd, timeout=self.prerun_tout)
 
-    def run(self, barrier):
-        remote_script = self.get_remote_for_script(self.run_script)
-        cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
-                             in self.options.items()])
-        cmd = remote_script + ' ' + cmd_opts
-        out_err = self.run_over_ssh(cmd, timeout=6000)
-        self.on_result(out_err, cmd)
-
-    def parse_results(self, out):
-        for line in out.split("\n"):
-            key, separator, value = line.partition(":")
-            if key and value:
-                self.on_result_cb((key, float(value)))
-
-    def on_result(self, out_err, cmd):
-        try:
-            self.parse_results(out_err)
-        except Exception as exc:
-            msg_templ = "Error during postprocessing results: {0!s}. {1}"
-            raise RuntimeError(msg_templ.format(exc, out_err))
-
-    def merge_results(self, results):
-        tpcm = sum([val[1] for val in results])
-        return {"res": {"TpmC": tpcm}}
+    def run(self, node):
+        cmd = self.get_remote_for_script(self.run_script)
+        cmd += ' ' + self.config.params.get('run_opts', '')
+        t1 = time.time()
+        res = run_on_node(node)(cmd, timeout=self.run_tout)
+        t2 = time.time()
+        return TestResults(self.config, None, res, (t1, t2))