blob: af720842146151bc7b5e951827e0870e64fd8721 [file] [log] [blame]
import abc
import time
import logging
import os.path
import functools
from concurrent.futures import ThreadPoolExecutor
from wally.utils import Barrier, StopTestError
from wally.statistic import data_property
from wally.ssh_utils import run_over_ssh, copy_paths
logger = logging.getLogger("wally")
class TestConfig(object):
"""
this class describe test input configuration
test_type:str - test type name
params:{str:Any} - parameters from yaml file for this test
test_uuid:str - UUID to be used to create filenames and Co
log_directory:str - local directory to store results
nodes:[Node] - node to run tests on
remote_dir:str - directory on nodes to be used for local files
"""
def __init__(self, test_type, params, test_uuid, nodes,
log_directory, remote_dir):
self.test_type = test_type
self.params = params
self.test_uuid = test_uuid
self.log_directory = log_directory
self.nodes = nodes
self.remote_dir = remote_dir
class TestResults(object):
"""
this class describe test results
config:TestConfig - test config object
params:dict - parameters from yaml file for this test
results:{str:MeasurementMesh} - test results object
raw_result:Any - opaque object to store raw results
run_interval:(float, float) - test tun time, used for sensors
"""
def __init__(self, config, results, raw_result, run_interval):
self.config = config
self.params = config.params
self.results = results
self.raw_result = raw_result
self.run_interval = run_interval
def __str__(self):
res = "{0}({1}):\n results:\n".format(
self.__class__.__name__,
self.summary())
for name, val in self.results.items():
res += " {0}={1}\n".format(name, val)
res += " params:\n"
for name, val in self.params.items():
res += " {0}={1}\n".format(name, val)
return res
@abc.abstractmethod
def summary(self):
pass
@abc.abstractmethod
def get_yamable(self):
pass
class MeasurementMatrix(object):
"""
data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
"""
def __init__(self, data, connections_ids):
self.data = data
self.connections_ids = connections_ids
def per_vm(self):
return self.data
def per_th(self):
return sum(self.data, [])
class MeasurementResults(object):
def stat(self):
return data_property(self.data)
def __str__(self):
return 'TS([' + ", ".join(map(str, self.data)) + '])'
class SimpleVals(MeasurementResults):
"""
data:[float] - list of values
"""
def __init__(self, data):
self.data = data
class TimeSeriesValue(MeasurementResults):
"""
data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
odata: original values
"""
def __init__(self, data):
assert len(data) > 0
self.odata = data[:]
self.data = []
cstart = 0
for nstart, nval in data:
self.data.append((cstart, nstart - cstart, nval))
cstart = nstart
@property
def values(self):
return [val[2] for val in self.data]
def average_interval(self):
return float(sum([val[1] for val in self.data])) / len(self.data)
def skip(self, seconds):
nres = []
for start, ln, val in self.data:
nstart = start + ln - seconds
if nstart > 0:
nres.append([nstart, val])
return self.__class__(nres)
def derived(self, tdelta):
end = self.data[-1][0] + self.data[-1][1]
tdelta = float(tdelta)
ln = end / tdelta
if ln - int(ln) > 0:
ln += 1
res = [[tdelta * i, 0.0] for i in range(int(ln))]
for start, lenght, val in self.data:
start_idx = int(start / tdelta)
end_idx = int((start + lenght) / tdelta)
for idx in range(start_idx, end_idx + 1):
rstart = tdelta * idx
rend = tdelta * (idx + 1)
intersection_ln = min(rend, start + lenght) - max(start, rstart)
if intersection_ln > 0:
try:
res[idx][1] += val * intersection_ln / tdelta
except IndexError:
raise
return self.__class__(res)
class PerfTest(object):
"""
Very base class for tests
config:TestConfig - test configuration
stop_requested:bool - stop for test requested
"""
def __init__(self, config):
self.config = config
self.stop_requested = False
def request_stop(self):
self.stop_requested = True
def join_remote(self, path):
return os.path.join(self.config.remote_dir, path)
@classmethod
@abc.abstractmethod
def load(cls, path):
pass
@abc.abstractmethod
def run(self):
pass
@abc.abstractmethod
def format_for_console(cls, data):
pass
def run_on_node(node):
def closure(*args, **kwargs):
return run_over_ssh(node.connection,
*args,
node=node.get_conn_id(),
**kwargs)
return closure
class ThreadedTest(PerfTest):
"""
Base class for tests, which spawn separated thread for each node
"""
def run(self):
barrier = Barrier(len(self.nodes))
th_test_func = functools.partial(self.th_test_func, barrier)
with ThreadPoolExecutor(len(self.nodes)) as pool:
return list(pool.map(th_test_func, self.config.nodes))
@abc.abstractmethod
def do_test(self, node):
pass
def th_test_func(self, barrier, node):
logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
node.conn_url))
logger.debug("Run preparation for {0}".format(node.get_conn_id()))
self.pre_run(node)
barrier.wait()
try:
logger.debug("Run test for {0}".format(node.get_conn_id()))
return self.do_test(node)
except StopTestError as exc:
pass
except Exception as exc:
msg = "In test {0} for node {1}".format(self, node.get_conn_id())
logger.exception(msg)
exc = StopTestError(msg, exc)
try:
self.cleanup()
except StopTestError as exc1:
if exc is None:
exc = exc1
except Exception as exc1:
if exc is None:
msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
logger.exception(msg)
exc = StopTestError(msg, exc)
if exc is not None:
raise exc
def pre_run(self, node):
pass
def cleanup(self, node):
pass
class TwoScriptTest(ThreadedTest):
def __init__(self, *dt, **mp):
ThreadedTest.__init__(self, *dt, **mp)
self.prerun_script = self.config.params['prerun_script']
self.run_script = self.config.params['run_script']
self.prerun_tout = self.config.params.get('prerun_tout', 3600)
self.run_tout = self.config.params.get('run_tout', 3600)
def get_remote_for_script(self, script):
return os.path.join(self.options.remote_dir,
os.path.basename(script))
def pre_run(self, node):
copy_paths(node.connection,
{
self.run_script: self.get_remote_for_script(self.run_script),
self.prerun_script: self.get_remote_for_script(self.prerun_script),
})
cmd = self.get_remote_for_script(self.pre_run_script)
cmd += ' ' + self.config.params.get('prerun_opts', '')
run_on_node(node)(cmd, timeout=self.prerun_tout)
def run(self, node):
cmd = self.get_remote_for_script(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
t1 = time.time()
res = run_on_node(node)(cmd, timeout=self.run_tout)
t2 = time.time()
return TestResults(self.config, None, res, (t1, t2))