very large refactoring
diff --git a/wally/__init__.py b/wally/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/__init__.py
diff --git a/wally/__main__.py b/wally/__main__.py
new file mode 100644
index 0000000..97011d9
--- /dev/null
+++ b/wally/__main__.py
@@ -0,0 +1,6 @@
+import sys
+from .run_test import main
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv))
diff --git a/wally/assumptions_check.py b/wally/assumptions_check.py
new file mode 100644
index 0000000..41ae2e0
--- /dev/null
+++ b/wally/assumptions_check.py
@@ -0,0 +1,170 @@
+import sys
+
+import texttable as TT
+
+import numpy as np
+import matplotlib.pyplot as plt
+from numpy.polynomial.chebyshev import chebfit, chebval
+
+from .io_results_loader import load_data, filter_data
+from .statistic import approximate_line, difference
+
+
+def linearity_plot(data, types, vals=None):
+    fields = 'blocksize_b', 'iops_mediana', 'iops_stddev'
+
+    names = {}
+    for tp1 in ('rand', 'seq'):
+        for oper in ('read', 'write'):
+            for sync in ('sync', 'direct', 'async'):
+                sq = (tp1, oper, sync)
+                name = "{0} {1} {2}".format(*sq)
+                names["".join(word[0] for word in sq)] = name
+
+    colors = ['red', 'green', 'blue', 'cyan',
+              'magenta', 'black', 'yellow', 'burlywood']
+    markers = ['*', '^', 'x', 'o', '+', '.']
+    color = 0
+    marker = 0
+
+    for tp in types:
+        filtered_data = filter_data('linearity_test_' + tp, fields)
+        x = []
+        y = []
+        e = []
+        # values to make line
+        ax = []
+        ay = []
+
+        for sz, med, dev in sorted(filtered_data(data)):
+            iotime_ms = 1000. // med
+            iotime_max = 1000. // (med - dev * 3)
+
+            x.append(sz / 1024.0)
+            y.append(iotime_ms)
+            e.append(iotime_max - iotime_ms)
+            if vals is None or sz in vals:
+                ax.append(sz / 1024.0)
+                ay.append(iotime_ms)
+
+        plt.errorbar(x, y, e, linestyle='None', label=names[tp],
+                     color=colors[color], ecolor="black",
+                     marker=markers[marker])
+        ynew = approximate_line(ax, ay, ax, True)
+        plt.plot(ax, ynew, color=colors[color])
+        color += 1
+        marker += 1
+    plt.legend(loc=2)
+    plt.title("Linearity test by %i dots" % (len(vals)))
+
+
+def linearity_table(data, types, vals):
+    """ create table by pyplot with diferences
+        between original and approximated
+        vals - values to make line"""
+    fields = 'blocksize_b', 'iops_mediana'
+    for tp in types:
+        filtered_data = filter_data('linearity_test_' + tp, fields)
+        # all values
+        x = []
+        y = []
+        # values to make line
+        ax = []
+        ay = []
+
+        for sz, med in sorted(filtered_data(data)):
+            iotime_ms = 1000. // med
+            x.append(sz / 1024.0)
+            y.append(iotime_ms)
+            if sz in vals:
+                ax.append(sz / 1024.0)
+                ay.append(iotime_ms)
+
+        ynew = approximate_line(ax, ay, x, True)
+
+        dif, _, _ = difference(y, ynew)
+        table_data = []
+        for i, d in zip(x, dif):
+            row = ["{0:.1f}".format(i), "{0:.1f}".format(d[0]), "{0:.0f}".format(d[1]*100)]
+            table_data.append(row)
+
+        tab = TT.Texttable()
+        tab.set_deco(tab.VLINES)
+
+        header = ["BlockSize, kB", "Absolute difference (ms)", "Relative difference (%)"]
+        tab.add_row(header)
+        tab.header = header
+
+        for row in table_data:
+            tab.add_row(row)
+
+        # uncomment to get table in pretty pictures :)
+        # colLabels = ("BlockSize, kB", "Absolute difference (ms)", "Relative difference (%)")
+        # fig = plt.figure()
+        # ax = fig.add_subplot(111)
+        # ax.axis('off')
+        # #do the table
+        # the_table = ax.table(cellText=table_data,
+        #           colLabels=colLabels,
+        #           loc='center')
+        # plt.savefig(tp+".png")
+
+
+def th_plot(data, tt):
+    fields = 'concurence', 'iops_mediana', 'lat_mediana'
+    conc_4k = filter_data('concurrence_test_' + tt, fields, blocksize='4k')
+    filtered_data = sorted(list(conc_4k(data)))
+
+    x, iops, lat = zip(*filtered_data)
+
+    _, ax1 = plt.subplots()
+
+    xnew = np.linspace(min(x), max(x), 50)
+    # plt.plot(xnew, power_smooth, 'b-', label='iops')
+    ax1.plot(x, iops, 'b*')
+
+    for degree in (3,):
+        c = chebfit(x, iops, degree)
+        vals = chebval(xnew, c)
+        ax1.plot(xnew, vals, 'g--')
+
+    # ax1.set_xlabel('thread count')
+    # ax1.set_ylabel('iops')
+
+    # ax2 = ax1.twinx()
+    # lat = [i / 1000 for i in lat]
+    # ax2.plot(x, lat, 'r*')
+
+    # tck = splrep(x, lat, s=0.0)
+    # power_smooth = splev(xnew, tck)
+    # ax2.plot(xnew, power_smooth, 'r-', label='lat')
+
+    # xp = xnew[0]
+    # yp = power_smooth[0]
+    # for _x, _y in zip(xnew[1:], power_smooth[1:]):
+    #     if _y >= 100:
+    #         xres = (_y - 100.) / (_y - yp) * (_x - xp) + xp
+    #         ax2.plot([xres, xres], [min(power_smooth), max(power_smooth)], 'g--')
+    #         break
+    #     xp = _x
+    #     yp = _y
+
+    # ax2.plot([min(x), max(x)], [20, 20], 'g--')
+    # ax2.plot([min(x), max(x)], [100, 100], 'g--')
+
+    # ax2.set_ylabel("lat ms")
+    # plt.legend(loc=2)
+
+
+def main(argv):
+    data = list(load_data(open(argv[1]).read()))
+    linearity_table(data, ["rwd", "rws", "rrd"], [4096, 4096*1024])
+    # linearity_plot(data, ["rwd", "rws", "rrd"])#, [4096, 4096*1024])
+    # linearity_plot(data, ["rws", "rwd"])
+    # th_plot(data, 'rws')
+    # th_plot(data, 'rrs')
+    plt.show()
+
+
+if __name__ == "__main__":
+    exit(main(sys.argv))
diff --git a/wally/charts.py b/wally/charts.py
new file mode 100644
index 0000000..a168f9d
--- /dev/null
+++ b/wally/charts.py
@@ -0,0 +1,145 @@
+import os
+import sys
+import hashlib
+
+from GChartWrapper import Line
+from GChartWrapper import constants
+from GChartWrapper import VerticalBarGroup
+
+from config import cfg_dict
+
+
+# Patch MARKER constant
+constants.MARKERS += 'E'
+sys.modules['GChartWrapper.GChart'].MARKERS += 'E'
+
+
+COLORS = ["1569C7", "81D8D0", "307D7E", "5CB3FF", "0040FF", "81DAF5"]
+constants.MARKERS += 'E'  # append E marker to available markers
+
+
+def get_top_top_dir(path):
+    top_top_dir = os.path.dirname(os.path.dirname(path))
+    return path[len(top_top_dir) + 1:]
+
+
+def render_vertical_bar(title, legend, bars_data, bars_dev_top,
+                        bars_dev_bottom, file_name,
+                        width=700, height=400,
+                        scale_x=None, scale_y=None, label_x=None,
+                        label_y=None, lines=()):
+    """
+    Renders vertical bar group chart
+
+    :param legend - list of legend values.
+        Example: ['bar1', 'bar2', 'bar3']
+    :param dataset - list of values for each type (value, deviation)
+        Example:
+            [
+                [(10,1), (11, 2), (10,1)], # bar1 values
+                [(30,(29,33)),(35,(33,36)), (30,(29,33))], # bar2 values
+                [(20,(19,21)),(20,(13, 24)), (20,(19,21))] # bar 3 values
+            ]
+    :param width - width of chart
+    :param height - height of chart
+    :param scale_x - x ace scale
+    :param scale_y - y ace scale
+
+    :returns url to chart
+
+    dataset example:
+    {
+        'relese_1': {
+            'randr': (1, 0.1),
+            'randwr': (2, 0.2)
+        }
+        'release_2': {
+            'randr': (3, 0.3),
+            'randwr': (4, 0.4)
+        }
+    }
+    """
+
+    bar = VerticalBarGroup([], encoding='text')
+    bar.title(title)
+
+    dataset = bars_data + bars_dev_top + bars_dev_bottom + \
+        [l[0] for l in lines]
+
+    bar.dataset(dataset, series=len(bars_data))
+    bar.axes.type('xyy')
+    bar.axes.label(2, None, label_x)
+
+    if scale_x:
+        bar.axes.label(0, *scale_x)
+
+    max_value = (max([max(l) for l in dataset[:2]]))
+    bar.axes.range(1, 0, max_value)
+    bar.axes.style(1, 'N*s*')
+    bar.axes.style(2, '000000', '13')
+
+    bar.scale(*[0, max_value] * 3)
+
+    bar.bar('r', '.1', '1')
+    for i in range(1):
+        bar.marker('E', '000000', '%s:%s' % ((len(bars_data) + i*2), i),
+                   '', '1:10')
+    bar.color(*COLORS)
+    bar.size(width, height)
+
+    axes_type = "xyy"
+
+    scale = [0, max_value] * len(bars_dev_top + bars_dev_bottom + bars_data)
+    if lines:
+        line_n = 0
+        for data, label, axe, leg in lines:
+            bar.marker('D', COLORS[len(bars_data) + line_n],
+                       (len(bars_data + bars_dev_top + bars_dev_bottom))
+                       + line_n, 0, 3)
+            # max_val_l = max(data)
+            if axe:
+                max_val_l = max(data)
+                bar.axes.type(axes_type + axe)
+                bar.axes.range(len(axes_type), 0, max_val_l)
+                bar.axes.style(len(axes_type), 'N*s*')
+                bar.axes.label(len(axes_type) + 1, None, label)
+                bar.axes.style(len(axes_type) + 1, '000000', '13')
+                axes_type += axe
+                line_n += 1
+                scale += [0, max_val_l]
+            else:
+                scale += [0, max_value]
+            legend.append(leg)
+            # scale += [0, max_val_l]
+
+    bar.legend(*legend)
+    bar.scale(*scale)
+    img_name = file_name + ".png"
+    img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
+
+    if not os.path.exists(img_path):
+        bar.save(img_path)
+
+    return get_top_top_dir(img_path)
+
+
+def render_lines(title, legend, dataset, scale_x, width=700, height=400):
+    line = Line([], encoding="text")
+    line.title(title)
+    line.dataset(dataset)
+
+    line.axes('xy')
+    max_value = (max([max(l) for l in dataset]))
+    line.axes.range(1, 0, max_value)
+    line.scale(0, max_value)
+    line.axes.label(0, *scale_x)
+    line.legend(*legend)
+    line.color(*COLORS[:len(legend)])
+    line.size(width, height)
+
+    img_name = hashlib.md5(str(line)).hexdigest() + ".png"
+    img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
+    if not os.path.exists(img_path):
+        line.save(img_path)
+
+    return get_top_top_dir(img_path)
diff --git a/wally/config.py b/wally/config.py
new file mode 100644
index 0000000..03b7ac9
--- /dev/null
+++ b/wally/config.py
@@ -0,0 +1,129 @@
+import os
+import uuid
+import logging
+import functools
+
+import yaml
+
+try:
+    from petname import Generate as pet_generate
+except ImportError:
+    def pet_generate(x, y):
+        return str(uuid.uuid4())
+
+
+cfg_dict = {}
+
+
+def mkdirs_if_unxists(path):
+    if not os.path.exists(path):
+        os.makedirs(path)
+
+
+def load_config(file_name, explicit_folder=None):
+    first_load = len(cfg_dict) == 0
+    cfg_dict.update(yaml.load(open(file_name).read()))
+
+    if first_load:
+        var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
+
+    run_uuid = None
+    if explicit_folder is None:
+        for i in range(10):
+            run_uuid = pet_generate(2, "_")
+            results_dir = os.path.join(var_dir, run_uuid)
+            if not os.path.exists(results_dir):
+                break
+        else:
+            run_uuid = str(uuid.uuid4())
+            results_dir = os.path.join(var_dir, run_uuid)
+    else:
+        results_dir = explicit_folder
+
+    cfg_dict['var_dir'] = results_dir
+    cfg_dict['run_uuid'] = run_uuid.replace('_', '-')
+    mkdirs_if_unxists(cfg_dict['var_dir'])
+
+    in_var_dir = functools.partial(os.path.join, cfg_dict['var_dir'])
+
+    cfg_dict['charts_img_path'] = in_var_dir('charts')
+    mkdirs_if_unxists(cfg_dict['charts_img_path'])
+
+    cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
+    cfg_dict['html_report_file'] = in_var_dir('report.html')
+    cfg_dict['text_report_file'] = in_var_dir('report.txt')
+    cfg_dict['log_file'] = in_var_dir('log.txt')
+    cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+
+    cfg_dict['test_log_directory'] = in_var_dir('test_logs')
+    mkdirs_if_unxists(cfg_dict['test_log_directory'])
+
+
+def color_me(color):
+    RESET_SEQ = "\033[0m"
+    COLOR_SEQ = "\033[1;%dm"
+
+    color_seq = COLOR_SEQ % (30 + color)
+
+    def closure(msg):
+        return color_seq + msg + RESET_SEQ
+    return closure
+
+
+class ColoredFormatter(logging.Formatter):
+    BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+    colors = {
+        'WARNING': color_me(YELLOW),
+        'DEBUG': color_me(BLUE),
+        'CRITICAL': color_me(YELLOW),
+        'ERROR': color_me(RED)
+    }
+
+    def __init__(self, msg, use_color=True, datefmt=None):
+        logging.Formatter.__init__(self, msg, datefmt=datefmt)
+        self.use_color = use_color
+
+    def format(self, record):
+        orig = record.__dict__
+        record.__dict__ = record.__dict__.copy()
+        levelname = record.levelname
+
+        prn_name = levelname + ' ' * (8 - len(levelname))
+        if levelname in self.colors:
+            record.levelname = self.colors[levelname](prn_name)
+        else:
+            record.levelname = prn_name
+
+        res = super(ColoredFormatter, self).format(record)
+
+        # restore record, as it will be used by other formatters
+        record.__dict__ = orig
+        return res
+
+
+def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+    logger = logging.getLogger('wally')
+    logger.setLevel(logging.DEBUG)
+    sh = logging.StreamHandler()
+    sh.setLevel(def_level)
+
+    log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+    colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
+
+    sh.setFormatter(colored_formatter)
+    logger.addHandler(sh)
+
+    logger_api = logging.getLogger("wally.fuel_api")
+
+    if log_fname is not None:
+        fh = logging.FileHandler(log_fname)
+        log_format = '%(asctime)s - %(levelname)8s - %(name)s - %(message)s'
+        formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
+        fh.setFormatter(formatter)
+        fh.setLevel(logging.DEBUG)
+        logger.addHandler(fh)
+        logger_api.addHandler(fh)
+
+    logger_api.addHandler(sh)
+    logger_api.setLevel(logging.WARNING)
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
new file mode 100644
index 0000000..b02809c
--- /dev/null
+++ b/wally/discover/__init__.py
@@ -0,0 +1,5 @@
+"this package contains node discovery code"
+from .discover import discover, undiscover
+from .node import Node
+
+__all__ = ["discover", "Node", "undiscover"]
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
new file mode 100644
index 0000000..70a6edf
--- /dev/null
+++ b/wally/discover/ceph.py
@@ -0,0 +1,89 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+import subprocess
+
+
+from .node import Node
+from wally.ssh_utils import connect
+
+
+logger = logging.getLogger("io-perf-tool")
+
+
+def local_execute(cmd):
+    return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
+
+
+def ssh_execute(ssh):
+    def closure(cmd):
+        _, chan, _ = ssh.exec_command(cmd)
+        return chan.read()
+    return closure
+
+
+def discover_ceph_nodes(ip):
+    """ Return list of ceph's nodes ips """
+    ips = {}
+
+    if ip != 'local':
+        executor = ssh_execute(connect(ip))
+    else:
+        executor = local_execute
+
+    osd_ips = get_osds_ips(executor, get_osds_list(executor))
+    mon_ips = get_mons_or_mds_ips(executor, "mon")
+    mds_ips = get_mons_or_mds_ips(executor, "mds")
+
+    for ip in osd_ips:
+        url = "ssh://%s" % ip
+        ips.setdefault(url, []).append("ceph-osd")
+
+    for ip in mon_ips:
+        url = "ssh://%s" % ip
+        ips.setdefault(url, []).append("ceph-mon")
+
+    for ip in mds_ips:
+        url = "ssh://%s" % ip
+        ips.setdefault(url, []).append("ceph-mds")
+
+    return [Node(url, list(roles)) for url, roles in ips.items()]
+
+
+def get_osds_list(executor):
+    """ Get list of osds id"""
+    return filter(None, executor("ceph osd ls").split("\n"))
+
+
+def get_mons_or_mds_ips(executor, who):
+    """ Return mon ip list
+        :param who - "mon" or "mds" """
+    if who not in ("mon", "mds"):
+        raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
+                          "of mon/mds") % who)
+
+    line_res = executor("ceph {0} dump".format(who)).split("\n")
+
+    ips = set()
+    for line in line_res:
+        fields = line.split()
+
+        # what does fields[1], fields[2] means?
+        # make this code looks like:
+        # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
+
+        if len(fields) > 2 and who in fields[2]:
+            ips.add(fields[1].split(":")[0])
+
+    return ips
+
+
+def get_osds_ips(executor, osd_list):
+    """ Get osd's ips
+        :param osd_list - list of osd names from osd ls command"""
+    ips = set()
+    for osd_id in osd_list:
+        out = executor("ceph osd find {0}".format(osd_id))
+        ip = json.loads(out)["ip"]
+        ips.add(str(ip.split(":")[0]))
+    return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
new file mode 100644
index 0000000..8c78387
--- /dev/null
+++ b/wally/discover/discover.py
@@ -0,0 +1,64 @@
+import logging
+
+from . import ceph
+from . import fuel
+from . import openstack
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def discover(ctx, discover, clusters_info, var_dir):
+    nodes_to_run = []
+    clean_data = None
+    for cluster in discover:
+        if cluster == "openstack":
+            cluster_info = clusters_info["openstack"]
+            conn = cluster_info['connection']
+            user, passwd, tenant = parse_creds(conn['creds'])
+
+            auth_data = dict(
+                auth_url=conn['auth_url'],
+                username=user,
+                api_key=passwd,
+                project_id=tenant)
+
+            if not conn:
+                logger.error("No connection provided for %s. Skipping"
+                             % cluster)
+                continue
+
+            logger.debug("Discovering openstack nodes "
+                         "with connection details: %r" %
+                         conn)
+
+            os_nodes = openstack.discover_openstack_nodes(auth_data,
+                                                          cluster_info)
+            nodes_to_run.extend(os_nodes)
+
+        elif cluster == "fuel":
+
+            res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
+            nodes, clean_data, openrc_dict = res
+
+            ctx.fuel_openstack_creds = {'name': openrc_dict['username'],
+                                        'passwd': openrc_dict['password'],
+                                        'tenant': openrc_dict['tenant_name'],
+                                        'auth_url': openrc_dict['os_auth_url']}
+
+            nodes_to_run.extend(nodes)
+
+        elif cluster == "ceph":
+            cluster_info = clusters_info["ceph"]
+            nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+        else:
+            msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
+            raise ValueError(msg_templ.format(cluster))
+
+    return nodes_to_run, clean_data
+
+
+def undiscover(clean_data):
+    if clean_data is not None:
+        fuel.clean_fuel_port_forwarding(clean_data)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
new file mode 100644
index 0000000..6e76188
--- /dev/null
+++ b/wally/discover/fuel.py
@@ -0,0 +1,136 @@
+import os
+import re
+import sys
+import socket
+import logging
+from urlparse import urlparse
+
+import yaml
+from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
+                                 reflect_cluster, FuelInfo)
+from wally.utils import parse_creds
+from wally.ssh_utils import run_over_ssh, connect
+
+from .node import Node
+
+
+logger = logging.getLogger("wally.discover")
+BASE_PF_PORT = 33467
+
+
+def discover_fuel_nodes(fuel_data, var_dir):
+    username, tenant_name, password = parse_creds(fuel_data['creds'])
+    creds = {"username": username,
+             "tenant_name": tenant_name,
+             "password": password}
+
+    conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
+
+    cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
+    cluster = reflect_cluster(conn, cluster_id)
+    version = FuelInfo(conn).get_version()
+
+    fuel_nodes = list(cluster.get_nodes())
+
+    logger.debug("Found FUEL {0}".format("".join(map(str, version))))
+
+    network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
+
+    ssh_creds = fuel_data['ssh_creds']
+
+    fuel_host = urlparse(fuel_data['url']).hostname
+    fuel_ip = socket.gethostbyname(fuel_host)
+    ssh_conn = connect("{0}@@{1}".format(ssh_creds, fuel_host))
+
+    fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
+
+    # TODO: keep ssh key in memory
+    # http://stackoverflow.com/questions/11994139/how-to-include-the-private-key-in-paramiko-after-fetching-from-string
+    fuel_key_file = os.path.join(var_dir, "fuel_master_node_id_rsa")
+    download_master_key(ssh_conn, fuel_key_file)
+
+    nodes = []
+    ports = range(BASE_PF_PORT, BASE_PF_PORT + len(fuel_nodes))
+    ips_ports = []
+
+    for fuel_node, port in zip(fuel_nodes, ports):
+        ip = fuel_node.get_ip(network)
+        forward_ssh_port(ssh_conn, fuel_ext_iface, port, ip)
+
+        conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
+                                                   port,
+                                                   fuel_key_file)
+        nodes.append(Node(conn_url, fuel_node['roles']))
+        ips_ports.append((ip, port))
+
+    logger.debug("Found %s fuel nodes for env %r" %
+                 (len(nodes), fuel_data['openstack_env']))
+
+    return ([],
+            (ssh_conn, fuel_ext_iface, ips_ports),
+            cluster.get_openrc())
+
+    return (nodes,
+            (ssh_conn, fuel_ext_iface, ips_ports),
+            cluster.get_openrc())
+
+
+def download_master_key(conn, dest):
+    # download master key
+    sftp = conn.open_sftp()
+    sftp.get('/root/.ssh/id_rsa', dest)
+    os.chmod(dest, 0o400)
+    sftp.close()
+
+    logger.debug("Fuel master key stored in {0}".format(dest))
+
+
+def get_external_interface(conn, ip):
+    data = run_over_ssh(conn, "ip a", node='fuel-master')
+    curr_iface = None
+    for line in data.split("\n"):
+
+        match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
+        if match1 is not None:
+            curr_iface = match1.group('name')
+
+        match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
+        if match2 is not None:
+            if match2.group('ip') == ip:
+                assert curr_iface is not None
+                return curr_iface
+    raise KeyError("Can't found interface for ip {0}".format(ip))
+
+
+def forward_ssh_port(conn, iface, new_port, ip, clean=False):
+    mode = "-D" if clean is True else "-A"
+    cmd = "iptables -t nat {mode} PREROUTING -p tcp " + \
+          "-i {iface} --dport {port} -j DNAT --to {ip}:22"
+    run_over_ssh(conn,
+                 cmd.format(iface=iface, port=new_port, ip=ip, mode=mode),
+                 node='fuel-master')
+
+
+def clean_fuel_port_forwarding(clean_data):
+    conn, iface, ips_ports = clean_data
+    for ip, port in ips_ports:
+        forward_ssh_port(conn, iface, port, ip, clean=True)
+
+
+def main(argv):
+    fuel_data = yaml.load(open(sys.argv[1]).read())['clouds']['fuel']
+    nodes, to_clean, openrc = discover_fuel_nodes(fuel_data, '/tmp')
+
+    print nodes
+    print openrc
+    print "Ready to test"
+
+    sys.stdin.readline()
+
+    clean_fuel_port_forwarding(to_clean)
+
+    return 0
+
+
+if __name__ == "__main__":
+    main(sys.argv[1:])
diff --git a/wally/discover/node.py b/wally/discover/node.py
new file mode 100644
index 0000000..fad4f29
--- /dev/null
+++ b/wally/discover/node.py
@@ -0,0 +1,22 @@
+import urlparse
+
+
+class Node(object):
+
+    def __init__(self, conn_url, roles):
+        self.roles = roles
+        self.conn_url = conn_url
+        self.connection = None
+
+    def get_ip(self):
+        return urlparse.urlparse(self.conn_url).hostname
+
+    def __str__(self):
+        templ = "<Node: url={conn_url!r} roles={roles}" + \
+                " connected={is_connected}>"
+        return templ.format(conn_url=self.conn_url,
+                            roles=", ".join(self.roles),
+                            is_connected=self.connection is not None)
+
+    def __repr__(self):
+        return str(self)
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
new file mode 100644
index 0000000..32b4629
--- /dev/null
+++ b/wally/discover/openstack.py
@@ -0,0 +1,70 @@
+import socket
+import logging
+
+
+from novaclient.client import Client
+
+from .node import Node
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("io-perf-tool.discover")
+
+
+def get_floating_ip(vm):
+    addrs = vm.addresses
+    for net_name, ifaces in addrs.items():
+        for iface in ifaces:
+            if iface.get('OS-EXT-IPS:type') == "floating":
+                return iface['addr']
+
+
+def discover_vms(client, search_opts):
+    user, password, key = parse_creds(search_opts.pop('auth'))
+
+    servers = client.servers.list(search_opts=search_opts)
+    logger.debug("Found %s openstack vms" % len(servers))
+    return [Node(get_floating_ip(server), ["test_vm"], username=user,
+                 password=password, key_path=key)
+            for server in servers if get_floating_ip(server)]
+
+
+def discover_services(client, opts):
+    user, password, key = parse_creds(opts.pop('auth'))
+
+    services = []
+    if opts['service'] == "all":
+        services = client.services.list()
+    else:
+        if isinstance(opts['service'], basestring):
+            opts['service'] = [opts['service']]
+
+        for s in opts['service']:
+            services.extend(client.services.list(binary=s))
+
+    host_services_mapping = {}
+
+    for service in services:
+        ip = socket.gethostbyname(service.host)
+        host_services_mapping[ip].append(service.binary)
+
+    logger.debug("Found %s openstack service nodes" %
+                 len(host_services_mapping))
+    return [Node(host, services, username=user,
+                 password=password, key_path=key) for host, services in
+            host_services_mapping.items()]
+
+
+def discover_openstack_nodes(conn_details, conf):
+    """Discover vms running in openstack
+    :param conn_details - dict with openstack connection details -
+        auth_url, api_key (password), username
+    """
+    client = Client(version='1.1', **conn_details)
+    nodes = []
+    if conf.get('discover'):
+        services_to_discover = conf['discover'].get('nodes')
+        if services_to_discover:
+            nodes.extend(discover_services(client, services_to_discover))
+
+    return nodes
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
new file mode 100644
index 0000000..499510d
--- /dev/null
+++ b/wally/fuel_rest_api.py
@@ -0,0 +1,540 @@
+import re
+import json
+import time
+import logging
+import urllib2
+import urlparse
+from functools import partial, wraps
+
+import netaddr
+
+from keystoneclient.v2_0 import Client as keystoneclient
+from keystoneclient import exceptions
+
+
+logger = logging.getLogger("wally.fuel_api")
+
+
+class Urllib2HTTP(object):
+    """
+    class for making HTTP requests
+    """
+
+    allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
+
+    def __init__(self, root_url, headers=None):
+        """
+        """
+        if root_url.endswith('/'):
+            self.root_url = root_url[:-1]
+        else:
+            self.root_url = root_url
+
+        self.headers = headers if headers is not None else {}
+
+    def host(self):
+        return self.root_url.split('/')[2]
+
+    def do(self, method, path, params=None):
+        if path.startswith('/'):
+            url = self.root_url + path
+        else:
+            url = self.root_url + '/' + path
+
+        if method == 'get':
+            assert params == {} or params is None
+            data_json = None
+        else:
+            data_json = json.dumps(params)
+
+        logger.debug("HTTP: {} {}".format(method.upper(), url))
+
+        request = urllib2.Request(url,
+                                  data=data_json,
+                                  headers=self.headers)
+        if data_json is not None:
+            request.add_header('Content-Type', 'application/json')
+
+        request.get_method = lambda: method.upper()
+        response = urllib2.urlopen(request)
+
+        logger.debug("HTTP Responce: {}".format(response.code))
+
+        if response.code < 200 or response.code > 209:
+            raise IndexError(url)
+
+        content = response.read()
+
+        if '' == content:
+            return None
+
+        return json.loads(content)
+
+    def __getattr__(self, name):
+        if name in self.allowed_methods:
+            return partial(self.do, name)
+        raise AttributeError(name)
+
+
+class KeystoneAuth(Urllib2HTTP):
+    def __init__(self, root_url, creds, headers=None):
+        super(KeystoneAuth, self).__init__(root_url, headers)
+        admin_node_ip = urlparse.urlparse(root_url).hostname
+        self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
+        self.keystone = keystoneclient(
+            auth_url=self.keystone_url, **creds)
+        self.refresh_token()
+
+    def refresh_token(self):
+        """Get new token from keystone and update headers"""
+        try:
+            self.keystone.authenticate()
+            self.headers['X-Auth-Token'] = self.keystone.auth_token
+        except exceptions.AuthorizationFailure:
+            logger.warning(
+                'Cant establish connection to keystone with url %s',
+                self.keystone_url)
+
+    def do(self, method, path, params=None):
+        """Do request. If gets 401 refresh token"""
+        try:
+            return super(KeystoneAuth, self).do(method, path, params)
+        except urllib2.HTTPError as e:
+            if e.code == 401:
+                logger.warning(
+                    'Authorization failure: {0}'.format(e.read()))
+                self.refresh_token()
+                return super(KeystoneAuth, self).do(method, path, params)
+            else:
+                raise
+
+
+def get_inline_param_list(url):
+    format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
+    for match in format_param_rr.finditer(url):
+        yield match.group(1)
+
+
+class RestObj(object):
+    name = None
+    id = None
+
+    def __init__(self, conn, **kwargs):
+        self.__dict__.update(kwargs)
+        self.__connection__ = conn
+
+    def __str__(self):
+        res = ["{}({}):".format(self.__class__.__name__, self.name)]
+        for k, v in sorted(self.__dict__.items()):
+            if k.startswith('__') or k.endswith('__'):
+                continue
+            if k != 'name':
+                res.append("    {}={!r}".format(k, v))
+        return "\n".join(res)
+
+    def __getitem__(self, item):
+        return getattr(self, item)
+
+
+def make_call(method, url):
+    def closure(obj, entire_obj=None, **data):
+        inline_params_vals = {}
+        for name in get_inline_param_list(url):
+            if name in data:
+                inline_params_vals[name] = data[name]
+                del data[name]
+            else:
+                inline_params_vals[name] = getattr(obj, name)
+        result_url = url.format(**inline_params_vals)
+
+        if entire_obj is not None:
+            if data != {}:
+                raise ValueError("Both entire_obj and data provided")
+            data = entire_obj
+        return obj.__connection__.do(method, result_url, params=data)
+    return closure
+
+
+PUT = partial(make_call, 'put')
+GET = partial(make_call, 'get')
+DELETE = partial(make_call, 'delete')
+
+
+def with_timeout(tout, message):
+    def closure(func):
+        @wraps(func)
+        def closure2(*dt, **mp):
+            ctime = time.time()
+            etime = ctime + tout
+
+            while ctime < etime:
+                if func(*dt, **mp):
+                    return
+                sleep_time = ctime + 1 - time.time()
+                if sleep_time > 0:
+                    time.sleep(sleep_time)
+                ctime = time.time()
+            raise RuntimeError("Timeout during " + message)
+        return closure2
+    return closure
+
+
+# -------------------------------  ORM ----------------------------------------
+
+
+def get_fuel_info(url):
+    conn = Urllib2HTTP(url)
+    return FuelInfo(conn)
+
+
+class FuelInfo(RestObj):
+
+    """Class represents Fuel installation info"""
+
+    get_nodes = GET('api/nodes')
+    get_clusters = GET('api/clusters')
+    get_cluster = GET('api/clusters/{id}')
+    get_info = GET('api/releases')
+
+    @property
+    def nodes(self):
+        """Get all fuel nodes"""
+        return NodeList([Node(self.__connection__, **node) for node
+                         in self.get_nodes()])
+
+    @property
+    def free_nodes(self):
+        """Get unallocated nodes"""
+        return NodeList([Node(self.__connection__, **node) for node in
+                         self.get_nodes() if not node['cluster']])
+
+    @property
+    def clusters(self):
+        """List clusters in fuel"""
+        return [Cluster(self.__connection__, **cluster) for cluster
+                in self.get_clusters()]
+
+    def get_version(self):
+        for info in self.get_info():
+            vers = info['version'].split("-")[1].split('.')
+            return map(int, vers)
+        raise ValueError("No version found")
+
+
+class Node(RestObj):
+    """Represents node in Fuel"""
+
+    get_info = GET('/api/nodes/{id}')
+    get_interfaces = GET('/api/nodes/{id}/interfaces')
+    update_interfaces = PUT('/api/nodes/{id}/interfaces')
+
+    def set_network_assigment(self, mapping):
+        """Assings networks to interfaces
+        :param mapping: list (dict) interfaces info
+        """
+
+        curr_interfaces = self.get_interfaces()
+
+        network_ids = {}
+        for interface in curr_interfaces:
+            for net in interface['assigned_networks']:
+                network_ids[net['name']] = net['id']
+
+        # transform mappings
+        new_assigned_networks = {}
+
+        for dev_name, networks in mapping.items():
+            new_assigned_networks[dev_name] = []
+            for net_name in networks:
+                nnet = {'name': net_name, 'id': network_ids[net_name]}
+                new_assigned_networks[dev_name].append(nnet)
+
+        # update by ref
+        for dev_descr in curr_interfaces:
+            if dev_descr['name'] in new_assigned_networks:
+                nass = new_assigned_networks[dev_descr['name']]
+                dev_descr['assigned_networks'] = nass
+
+        self.update_interfaces(curr_interfaces, id=self.id)
+
+    def set_node_name(self, name):
+        """Update node name"""
+        self.__connection__.put('api/nodes', [{'id': self.id, 'name': name}])
+
+    def get_network_data(self):
+        """Returns node network data"""
+        node_info = self.get_info()
+        return node_info.get('network_data')
+
+    def get_roles(self, pending=False):
+        """Get node roles
+
+        Returns: (roles, pending_roles)
+        """
+        node_info = self.get_info()
+        if pending:
+            return node_info.get('roles'), node_info.get('pending_roles')
+        else:
+            return node_info.get('roles')
+
+    def get_ip(self, network='public'):
+        """Get node ip
+
+        :param network: network to pick
+        """
+        nets = self.get_network_data()
+        for net in nets:
+            if net['name'] == network:
+                iface_name = net['dev']
+                for iface in self.get_info()['meta']['interfaces']:
+                    if iface['name'] == iface_name:
+                        try:
+                            return iface['ip']
+                        except KeyError:
+                            return netaddr.IPNetwork(net['ip']).ip
+        raise Exception('Network %s not found' % network)
+
+
+class NodeList(list):
+    """Class for filtering nodes through attributes"""
+    allowed_roles = ['controller', 'compute', 'cinder', 'ceph-osd', 'mongo',
+                     'zabbix-server']
+
+    def __getattr__(self, name):
+        if name in self.allowed_roles:
+            return [node for node in self if name in node.roles]
+
+
+class Cluster(RestObj):
+    """Class represents Cluster in Fuel"""
+
+    add_node_call = PUT('api/nodes')
+    start_deploy = PUT('api/clusters/{id}/changes')
+    get_status = GET('api/clusters/{id}')
+    delete = DELETE('api/clusters/{id}')
+    get_tasks_status = GET("api/tasks?cluster_id={id}")
+    get_networks = GET(
+        'api/clusters/{id}/network_configuration/{net_provider}')
+
+    get_attributes = GET(
+        'api/clusters/{id}/attributes')
+
+    set_attributes = PUT(
+        'api/clusters/{id}/attributes')
+
+    configure_networks = PUT(
+        'api/clusters/{id}/network_configuration/{net_provider}')
+
+    _get_nodes = GET('api/nodes?cluster_id={id}')
+
+    def __init__(self, *dt, **mp):
+        super(Cluster, self).__init__(*dt, **mp)
+        self.nodes = NodeList([Node(self.__connection__, **node) for node in
+                               self._get_nodes()])
+        self.network_roles = {}
+
+    def check_exists(self):
+        """Check if cluster exists"""
+        try:
+            self.get_status()
+            return True
+        except urllib2.HTTPError as err:
+            if err.code == 404:
+                return False
+            raise
+
+    def get_openrc(self):
+        access = self.get_attributes()['editable']['access']
+        creds = {}
+        creds['username'] = access['user']['value']
+        creds['password'] = access['password']['value']
+        creds['tenant_name'] = access['tenant']['value']
+        if self.nodes.controller:
+            contr = self.nodes.controller[0]
+            creds['os_auth_url'] = "http://%s:5000/v2.0" \
+                % contr.get_ip(network="public")
+        else:
+            creds['os_auth_url'] = ""
+        return creds
+
+    def get_nodes(self):
+        for node_descr in self._get_nodes():
+            yield Node(self.__connection__, **node_descr)
+
+    def add_node(self, node, roles, interfaces=None):
+        """Add node to cluster
+
+        :param node: Node object
+        :param roles: roles to assign
+        :param interfaces: mapping iface name to networks
+        """
+        data = {}
+        data['pending_roles'] = roles
+        data['cluster_id'] = self.id
+        data['id'] = node.id
+        data['pending_addition'] = True
+
+        logger.debug("Adding node %s to cluster..." % node.id)
+
+        self.add_node_call([data])
+        self.nodes.append(node)
+
+        if interfaces is not None:
+            networks = {}
+            for iface_name, params in interfaces.items():
+                networks[iface_name] = params['networks']
+
+            node.set_network_assigment(networks)
+
+    def wait_operational(self, timeout):
+        """Wait until cluster status operational"""
+        def wo():
+            status = self.get_status()['status']
+            if status == "error":
+                raise Exception("Cluster deploy failed")
+            return self.get_status()['status'] == 'operational'
+        with_timeout(timeout, "deploy cluster")(wo)()
+
+    def deploy(self, timeout):
+        """Start deploy and wait until all tasks finished"""
+        logger.debug("Starting deploy...")
+        self.start_deploy()
+
+        self.wait_operational(timeout)
+
+        def all_tasks_finished_ok(obj):
+            ok = True
+            for task in obj.get_tasks_status():
+                if task['status'] == 'error':
+                    raise Exception('Task execution error')
+                elif task['status'] != 'ready':
+                    ok = False
+            return ok
+
+        wto = with_timeout(timeout, "wait deployment finished")
+        wto(all_tasks_finished_ok)(self)
+
+    def set_networks(self, net_descriptions):
+        """Update cluster networking parameters"""
+        configuration = self.get_networks()
+        current_networks = configuration['networks']
+        parameters = configuration['networking_parameters']
+
+        if net_descriptions.get('networks'):
+            net_mapping = net_descriptions['networks']
+
+            for net in current_networks:
+                net_desc = net_mapping.get(net['name'])
+                if net_desc:
+                    net.update(net_desc)
+
+        if net_descriptions.get('networking_parameters'):
+            parameters.update(net_descriptions['networking_parameters'])
+
+        self.configure_networks(**configuration)
+
+
+def reflect_cluster(conn, cluster_id):
+    """Returns cluster object by id"""
+    c = Cluster(conn, id=cluster_id)
+    c.nodes = NodeList(list(c.get_nodes()))
+    return c
+
+
+def get_all_nodes(conn):
+    """Get all nodes from Fuel"""
+    for node_desc in conn.get('api/nodes'):
+        yield Node(conn, **node_desc)
+
+
+def get_all_clusters(conn):
+    """Get all clusters"""
+    for cluster_desc in conn.get('api/clusters'):
+        yield Cluster(conn, **cluster_desc)
+
+
+def get_cluster_id(conn, name):
+    """Get cluster id by name"""
+    for cluster in get_all_clusters(conn):
+        if cluster.name == name:
+            return cluster.id
+
+    raise ValueError("Cluster {0} not found".format(name))
+
+
+sections = {
+    'sahara': 'additional_components',
+    'murano': 'additional_components',
+    'ceilometer': 'additional_components',
+    'volumes_ceph': 'storage',
+    'images_ceph': 'storage',
+    'ephemeral_ceph': 'storage',
+    'objects_ceph': 'storage',
+    'osd_pool_size': 'storage',
+    'volumes_lvm': 'storage',
+    'volumes_vmdk': 'storage',
+    'tenant': 'access',
+    'password': 'access',
+    'user': 'access',
+    'vc_password': 'vcenter',
+    'cluster': 'vcenter',
+    'host_ip': 'vcenter',
+    'vc_user': 'vcenter',
+    'use_vcenter': 'vcenter',
+}
+
+
+def create_empty_cluster(conn, cluster_desc, debug_mode=False):
+    """Create new cluster with configuration provided"""
+
+    data = {}
+    data['nodes'] = []
+    data['tasks'] = []
+    data['name'] = cluster_desc['name']
+    data['release'] = cluster_desc['release']
+    data['mode'] = cluster_desc.get('deployment_mode')
+    data['net_provider'] = cluster_desc.get('net_provider')
+
+    params = conn.post(path='/api/clusters', params=data)
+    cluster = Cluster(conn, **params)
+
+    attributes = cluster.get_attributes()
+
+    ed_attrs = attributes['editable']
+
+    ed_attrs['common']['libvirt_type']['value'] = \
+        cluster_desc.get('libvirt_type', 'kvm')
+
+    if 'nodes' in cluster_desc:
+        use_ceph = cluster_desc['nodes'].get('ceph_osd', None) is not None
+    else:
+        use_ceph = False
+
+    if 'storage_type' in cluster_desc:
+        st = cluster_desc['storage_type']
+        if st == 'ceph':
+            use_ceph = True
+        else:
+            use_ceph = False
+
+    if use_ceph:
+        opts = ['ephemeral_ceph', 'images_ceph', 'images_vcenter']
+        opts += ['iser', 'objects_ceph', 'volumes_ceph']
+        opts += ['volumes_lvm', 'volumes_vmdk']
+
+        for name in opts:
+            val = ed_attrs['storage'][name]
+            if val['type'] == 'checkbox':
+                is_ceph = ('images_ceph' == name)
+                is_ceph = is_ceph or ('volumes_ceph' == name)
+
+                if is_ceph:
+                    val['value'] = True
+                else:
+                    val['value'] = False
+    # else:
+    #     raise NotImplementedError("Non-ceph storages are not implemented")
+
+    cluster.set_attributes(attributes)
+
+    return cluster
diff --git a/wally/keystone.py b/wally/keystone.py
new file mode 100644
index 0000000..24d322c
--- /dev/null
+++ b/wally/keystone.py
@@ -0,0 +1,90 @@
+import json
+import urllib2
+from functools import partial
+
+from keystoneclient import exceptions
+from keystoneclient.v2_0 import Client as keystoneclient
+
+
+class Urllib2HTTP(object):
+    """
+    class for making HTTP requests
+    """
+
+    allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
+
+    def __init__(self, root_url, headers=None, echo=False):
+        """
+        """
+        if root_url.endswith('/'):
+            self.root_url = root_url[:-1]
+        else:
+            self.root_url = root_url
+
+        self.headers = headers if headers is not None else {}
+        self.echo = echo
+
+    def do(self, method, path, params=None):
+        if path.startswith('/'):
+            url = self.root_url + path
+        else:
+            url = self.root_url + '/' + path
+
+        if method == 'get':
+            assert params == {} or params is None
+            data_json = None
+        else:
+            data_json = json.dumps(params)
+
+        request = urllib2.Request(url,
+                                  data=data_json,
+                                  headers=self.headers)
+        if data_json is not None:
+            request.add_header('Content-Type', 'application/json')
+
+        request.get_method = lambda: method.upper()
+        response = urllib2.urlopen(request)
+
+        if response.code < 200 or response.code > 209:
+            raise IndexError(url)
+
+        content = response.read()
+
+        if '' == content:
+            return None
+
+        return json.loads(content)
+
+    def __getattr__(self, name):
+        if name in self.allowed_methods:
+            return partial(self.do, name)
+        raise AttributeError(name)
+
+
+class KeystoneAuth(Urllib2HTTP):
+    def __init__(self, root_url, creds, headers=None, echo=False,
+                 admin_node_ip=None):
+        super(KeystoneAuth, self).__init__(root_url, headers, echo)
+        self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
+        self.keystone = keystoneclient(
+            auth_url=self.keystone_url, **creds)
+        self.refresh_token()
+
+    def refresh_token(self):
+        """Get new token from keystone and update headers"""
+        try:
+            self.keystone.authenticate()
+            self.headers['X-Auth-Token'] = self.keystone.auth_token
+        except exceptions.AuthorizationFailure:
+            raise
+
+    def do(self, method, path, params=None):
+        """Do request. If gets 401 refresh token"""
+        try:
+            return super(KeystoneAuth, self).do(method, path, params)
+        except urllib2.HTTPError as e:
+            if e.code == 401:
+                self.refresh_token()
+                return super(KeystoneAuth, self).do(method, path, params)
+            else:
+                raise
diff --git a/wally/meta_info.py b/wally/meta_info.py
new file mode 100644
index 0000000..127612b
--- /dev/null
+++ b/wally/meta_info.py
@@ -0,0 +1,68 @@
+from urlparse import urlparse
+from keystone import KeystoneAuth
+
+
+def total_lab_info(data):
+    lab_data = {}
+    lab_data['nodes_count'] = len(data['nodes'])
+    lab_data['total_memory'] = 0
+    lab_data['total_disk'] = 0
+    lab_data['processor_count'] = 0
+
+    for node in data['nodes']:
+        lab_data['total_memory'] += node['memory']['total']
+        lab_data['processor_count'] += len(node['processors'])
+
+        for disk in node['disks']:
+            lab_data['total_disk'] += disk['size']
+
+    def to_gb(x):
+        return x / (1024 ** 3)
+
+    lab_data['total_memory'] = format(to_gb(lab_data['total_memory']), ',d')
+    lab_data['total_disk'] = format(to_gb(lab_data['total_disk']), ',d')
+    return lab_data
+
+
+def collect_lab_data(url, cred):
+    u = urlparse(url)
+    keystone = KeystoneAuth(root_url=url, creds=cred, admin_node_ip=u.hostname)
+    lab_info = keystone.do(method='get', path="/api/nodes")
+    fuel_version = keystone.do(method='get', path="/api/version/")
+
+    nodes = []
+    result = {}
+
+    for node in lab_info:
+        # <koder>: give p,i,d,... vars meaningful names
+        d = {}
+        d['name'] = node['name']
+        p = []
+        i = []
+        disks = []
+        devices = []
+
+        for processor in node['meta']['cpu']['spec']:
+            p.append(processor)
+
+        for iface in node['meta']['interfaces']:
+            i.append(iface)
+
+        m = node['meta']['memory'].copy()
+
+        for disk in node['meta']['disks']:
+            disks.append(disk)
+
+        d['memory'] = m
+        d['disks'] = disks
+        d['devices'] = devices
+        d['interfaces'] = i
+        d['processors'] = p
+
+        nodes.append(d)
+
+    result['nodes'] = nodes
+    # result['name'] = 'Perf-1 Env'
+    result['fuel_version'] = fuel_version['release']
+
+    return result
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
new file mode 100644
index 0000000..4fc4a49
--- /dev/null
+++ b/wally/pretty_yaml.py
@@ -0,0 +1,79 @@
+__doc__ = "functions for make pretty yaml files"
+__all__ = ['dumps']
+
+
+def dumps_simple(val):
+    bad_symbols = set(" \r\t\n,':")
+
+    if isinstance(val, basestring):
+        if len(bad_symbols & set(val)) != 0:
+            return repr(val)
+        return val
+    elif val is True:
+        return 'true'
+    elif val is False:
+        return 'false'
+    elif val is None:
+        return 'null'
+
+    return str(val)
+
+
+def is_simple(val):
+    simple_type = isinstance(val, (str, unicode, int, long, bool, float))
+    return simple_type or val is None
+
+
+def all_nums(vals):
+    return all(isinstance(val, (int, float, long)) for val in vals)
+
+
+def dumpv(data, tab_sz=4, width=120, min_width=40):
+    tab = ' ' * tab_sz
+
+    if width < min_width:
+        width = min_width
+
+    res = []
+    if is_simple(data):
+        return [dumps_simple(data)]
+
+    if isinstance(data, (list, tuple)):
+        if all(map(is_simple, data)):
+            if all_nums(data):
+                one_line = "[{}]".format(", ".join(map(dumps_simple, data)))
+            else:
+                one_line = "[{}]".format(",".join(map(dumps_simple, data)))
+        else:
+            one_line = None
+
+        if one_line is None or len(one_line) > width:
+            pref = "-" + ' ' * (tab_sz - 1)
+
+            for val in data:
+                items = dumpv(val, tab_sz, width - tab_sz, min_width)
+                items = [pref + items[0]] + \
+                        [tab + item for item in items[1:]]
+                res.extend(items)
+        else:
+            res.append(one_line)
+    elif isinstance(data, dict):
+        assert all(map(is_simple, data.keys()))
+
+        for k, v in data.items():
+            key_str = dumps_simple(k) + ": "
+            val_res = dumpv(v, tab_sz, width - tab_sz, min_width)
+
+            if len(val_res) == 1 and len(key_str + val_res[0]) < width:
+                res.append(key_str + val_res[0])
+            else:
+                res.append(key_str)
+                res.extend(tab + i for i in val_res)
+    else:
+        raise ValueError("Can't pack {0!r}".format(data))
+
+    return res
+
+
+def dumps(data, tab_sz=4, width=120, min_width=40):
+    return "\n".join(dumpv(data, tab_sz, width, min_width))
diff --git a/wally/report.py b/wally/report.py
new file mode 100644
index 0000000..d2f2d96
--- /dev/null
+++ b/wally/report.py
@@ -0,0 +1,236 @@
+import os
+import sys
+
+from wally import charts
+from wally.statistic import med_dev
+from wally.utils import parse_creds
+from wally.suits.io.results_loader import filter_data
+from wally.meta_info import total_lab_info, collect_lab_data
+
+
+# from collections import OrderedDict
+# from wally.suits.io import formatter
+# def pgbench_chart_data(results):
+#     """
+#     Format pgbench results for chart
+#     """
+#     data = {}
+#     charts_url = []
+
+#     formatted_res = formatters.format_pgbench_stat(results)
+#     for key, value in formatted_res.items():
+#         num_cl, num_tr = key.split(' ')
+#         data.setdefault(num_cl, {}).setdefault(build, {})
+#         data[keys[z]][build][
+#             ' '.join(keys)] = value
+
+#     for name, value in data.items():
+#         title = name
+#         legend = []
+#         dataset = []
+
+#         scale_x = []
+
+#         for build_id, build_results in value.items():
+#             vals = []
+#             OD = OrderedDict
+#             ordered_build_results = OD(sorted(build_results.items(),
+#                                        key=lambda t: t[0]))
+#             scale_x = ordered_build_results.keys()
+#             for key in scale_x:
+#                 res = build_results.get(key)
+#                 if res:
+#                     vals.append(res)
+#             if vals:
+#                 dataset.append(vals)
+#                 legend.append(build_id)
+
+#         if dataset:
+#             charts_url.append(str(charts.render_vertical_bar
+#                               (title, legend, dataset, scale_x=scale_x)))
+#     return charts_url
+
+# def build_lines_chart(results, z=0):
+#     data = {}
+#     charts_url = []
+
+#     for build, res in results:
+#         formatted_res = formatters.get_formatter(build)(res)
+#         for key, value in formatted_res.items():
+#             keys = key.split(' ')
+#             data.setdefault(key[z], {})
+#             data[key[z]].setdefault(build, {})[keys[1]] = value
+
+#     for name, value in data.items():
+#         title = name
+#         legend = []
+#         dataset = []
+#         scale_x = []
+#         for build_id, build_results in value.items():
+#             legend.append(build_id)
+
+#             OD = OrderedDict
+#             ordered_build_results = OD(sorted(build_results.items(),
+#                                        key=lambda t: ssize_to_b(t[0])))
+
+#             if not scale_x:
+#                 scale_x = ordered_build_results.keys()
+#             dataset.append(zip(*ordered_build_results.values())[0])
+
+#         chart = charts.render_lines(title, legend, dataset, scale_x)
+#         charts_url.append(str(chart))
+
+#     return charts_url
+
+# def build_vertical_bar(results, z=0):
+#     data = {}
+#     charts_url = []
+#     for build, res in results:
+#         formatted_res = formatter.get_formatter(build)(res)
+#         for key, value in formatted_res.items():
+#             keys = key.split(' ')
+#             data.setdefault(keys[z], {}).setdefault(build, {})
+#             data[keys[z]][build][
+#                 ' '.join(keys)] = value
+
+#     for name, value in data.items():
+#         title = name
+#         legend = []
+#         dataset = []
+
+#         scale_x = []
+
+#         for build_id, build_results in value.items():
+#             vals = []
+#             OD = OrderedDict
+#             ordered_build_results = OD(sorted(build_results.items(),
+#                                        key=lambda t: t[0]))
+#             scale_x = ordered_build_results.keys()
+#             for key in scale_x:
+#                 res = build_results.get(key)
+#                 if res:
+#                     vals.append(res)
+#             if vals:
+#                 dataset.append(vals)
+#                 legend.append(build_id)
+
+#         if dataset:
+#             charts_url.append(str(charts.render_vertical_bar
+#                               (title, legend, dataset, scale_x=scale_x)))
+#     return charts_url
+
+
+def render_html(charts_urls, dest, lab_description, info):
+    templ = open("report.html", 'r').read()
+    body = "<a href='#lab_desc'>Lab description</a>" \
+           "<ol>{0}</ol>" \
+           "<div>{1}</div>" \
+           '<a name="lab_desc"></a>' \
+           "<div><ul>{2}</ul></div>"
+    table = "<table><tr><td>{0}</td><td>{1}</td></tr>" \
+            "<tr><td>{2}</td><td>{3}</td></tr></table>"
+    ul = []
+    ol = []
+    li = '<li>{0} : {1}</li>'
+
+    for elem in info:
+        ol.append(li.format(elem.keys(), elem.values()))
+
+    for key in lab_description:
+        value = lab_description[key]
+        ul.append("<li>{0} : {1}</li>".format(key, value))
+
+    charts_urls = ['<img src="{0}">'.format(url) for url in charts_urls]
+
+    body = body.format('\n'.join(ol),
+                       table.format(*charts_urls),
+                       '\n'.join(ul))
+
+    open(dest, 'w').write(templ % {'body': body})
+
+
+def io_chart(title, concurence, latv, iops_or_bw, iops_or_bw_dev,
+             legend, fname):
+    bar_data, bar_dev = iops_or_bw, iops_or_bw_dev
+    legend = [legend]
+
+    iops_or_bw_per_vm = []
+    for i in range(len(concurence)):
+        iops_or_bw_per_vm.append(iops_or_bw[i] / concurence[i])
+
+    bar_dev_bottom = []
+    bar_dev_top = []
+    for i in range(len(bar_data)):
+        bar_dev_top.append(bar_data[i] + bar_dev[i])
+        bar_dev_bottom.append(bar_data[i] - bar_dev[i])
+
+    latv = [lat / 1000 for lat in latv]
+    ch = charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
+                                    [bar_dev_bottom], file_name=fname,
+                                    scale_x=concurence,
+                                    lines=[
+                                        (latv, "msec", "rr", "lat"),
+                                        (iops_or_bw_per_vm, None, None,
+                                            "bw_per_vm")
+                                    ])
+    return str(ch)
+
+
+def make_io_report(results, path, lab_url=None, creds=None):
+    if lab_url is not None:
+        username, password, tenant_name = parse_creds(creds)
+        creds = {'username': username,
+                 'password': password,
+                 "tenant_name": tenant_name}
+        data = collect_lab_data(lab_url, creds)
+        lab_info = total_lab_info(data)
+    else:
+        lab_info = ""
+
+    for suite_type, test_suite_data in results:
+        if suite_type != 'io' or test_suite_data is None:
+            continue
+
+        io_test_suite_res = test_suite_data['res']
+
+        charts_url = []
+        info = []
+
+        name_filters = [
+            ('hdd_test_rrd4k', ('concurence', 'lat', 'iops'), 'rand_read_4k'),
+            ('hdd_test_swd1m', ('concurence', 'lat', 'bw'), 'seq_write_1m'),
+            ('hdd_test_srd1m', ('concurence', 'lat', 'bw'), 'seq_read_1m'),
+            ('hdd_test_rws4k', ('concurence', 'lat', 'bw'), 'rand_write_1m')
+        ]
+
+        for name_filter, fields, fname in name_filters:
+            th_filter = filter_data(name_filter, fields)
+
+            data = sorted(th_filter(io_test_suite_res.values()))
+            if len(data) == 0:
+                continue
+
+            concurence, latv, iops_or_bw_v = zip(*data)
+            iops_or_bw_v, iops_or_bw_dev_v = zip(*map(med_dev, iops_or_bw_v))
+            latv, _ = zip(*map(med_dev, latv))
+
+            url = io_chart(name_filter, concurence, latv, iops_or_bw_v,
+                           iops_or_bw_dev_v,
+                           fields[2], fname)
+            info.append(dict(zip(fields, (concurence, latv, iops_or_bw_v))))
+            charts_url.append(url)
+
+        if len(charts_url) != 0:
+            render_html(charts_url, path, lab_info, info)
+
+
+def main(args):
+    make_io_report(results=[('a', 'b')],
+                   path=os.path.dirname(args[0]),
+                   lab_url='http://172.16.52.112:8000',
+                   creds='admin:admin@admin')
+    return 0
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv))
diff --git a/wally/rest_api.py b/wally/rest_api.py
new file mode 100644
index 0000000..fc9bd05
--- /dev/null
+++ b/wally/rest_api.py
@@ -0,0 +1,25 @@
+import json
+import requests
+
+
+def add_test(test_name, test_data, url):
+    if not url.endswith("/"):
+        url += '/api/tests/' + test_name
+    requests.post(url=url, data=json.dumps(test_data))
+
+
+def get_test(test_name, url):
+    if not url.endswith("/"):
+        url += '/api/tests/' + test_name
+
+    result = requests.get(url=url)
+
+    return json.loads(result.content)
+
+
+def get_all_tests(url):
+    if not url.endswith('/'):
+        url += '/api/tests'
+
+    result = requests.get(url=url)
+    return json.loads(result.content)
diff --git a/wally/run_test.py b/wally/run_test.py
new file mode 100755
index 0000000..7899bae
--- /dev/null
+++ b/wally/run_test.py
@@ -0,0 +1,589 @@
+from __future__ import print_function
+
+import os
+import sys
+import time
+import Queue
+import pprint
+import logging
+import argparse
+import threading
+import collections
+
+import yaml
+from concurrent.futures import ThreadPoolExecutor
+
+from wally import pretty_yaml
+from wally.discover import discover, Node, undiscover
+from wally import utils, report, ssh_utils, start_vms
+from wally.suits.itest import IOPerfTest, PgBenchTest
+from wally.config import cfg_dict, load_config, setup_loggers
+from wally.sensors.api import (start_monitoring,
+                               deploy_and_start_sensors,
+                               SensorConfig)
+
+
+logger = logging.getLogger("wally")
+
+
+def format_result(res, formatter):
+    data = "\n{0}\n".format("=" * 80)
+    data += pprint.pformat(res) + "\n"
+    data += "{0}\n".format("=" * 80)
+    templ = "{0}\n\n====> {1}\n\n{2}\n\n"
+    return templ.format(data, formatter(res), "=" * 80)
+
+
+class Context(object):
+    def __init__(self):
+        self.build_meta = {}
+        self.nodes = []
+        self.clear_calls_stack = []
+        self.openstack_nodes_ids = []
+        self.sensor_cm = None
+        self.keep_vm = False
+        self.sensors_control_queue = None
+        self.sensor_listen_thread = None
+
+
+def connect_one(node):
+    try:
+        ssh_pref = "ssh://"
+        if node.conn_url.startswith(ssh_pref):
+            url = node.conn_url[len(ssh_pref):]
+            logger.debug("Try connect to " + url)
+            node.connection = ssh_utils.connect(url)
+        else:
+            raise ValueError("Unknown url type {0}".format(node.conn_url))
+    except Exception:
+        logger.exception("During connect to {0}".format(node))
+        raise
+
+
+def connect_all(nodes):
+    logger.info("Connecting to nodes")
+    with ThreadPoolExecutor(32) as pool:
+        list(pool.map(connect_one, nodes))
+    logger.info("All nodes connected successfully")
+
+
+def save_sensors_data(q, mon_q, fd):
+    logger.info("Start receiving sensors data")
+    fd.write("\n")
+
+    observed_nodes = set()
+
+    try:
+        while True:
+            val = q.get()
+            if val is None:
+                break
+
+            addr, data = val
+            if addr not in observed_nodes:
+                mon_q.put(addr)
+                observed_nodes.add(addr)
+
+            fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
+    except Exception:
+        logger.exception("Error in sensors thread")
+    logger.info("Sensors thread exits")
+
+
+def test_thread(test, node, barrier, res_q):
+    try:
+        logger.debug("Run preparation for {0}".format(node.conn_url))
+        test.pre_run(node.connection)
+        logger.debug("Run test for {0}".format(node.conn_url))
+        test.run(node.connection, barrier)
+    except Exception as exc:
+        logger.exception("In test {0} for node {1}".format(test, node))
+        res_q.put(exc)
+
+    try:
+        test.cleanup(node.connection)
+    except:
+        msg = "Duringf cleanup - in test {0} for node {1}"
+        logger.exception(msg.format(test, node))
+
+
+def run_tests(test_block, nodes):
+    tool_type_mapper = {
+        "io": IOPerfTest,
+        "pgbench": PgBenchTest,
+    }
+
+    test_nodes = [node for node in nodes
+                  if 'testnode' in node.roles]
+    test_number_per_type = {}
+    res_q = Queue.Queue()
+
+    for name, params in test_block.items():
+        logger.info("Starting {0} tests".format(name))
+        test_num = test_number_per_type.get(name, 0)
+        test_number_per_type[name] = test_num + 1
+        threads = []
+        barrier = utils.Barrier(len(test_nodes))
+
+        for node in test_nodes:
+            msg = "Starting {0} test on {1} node"
+            logger.debug(msg.format(name, node.conn_url))
+
+            dr = os.path.join(
+                    cfg_dict['test_log_directory'],
+                    "{0}_{1}_{2}".format(name, test_num, node.get_ip())
+                )
+
+            if not os.path.exists(dr):
+                os.makedirs(dr)
+
+            test = tool_type_mapper[name](params, res_q.put, dr,
+                                          node=node.get_ip())
+            th = threading.Thread(None, test_thread, None,
+                                  (test, node, barrier, res_q))
+            threads.append(th)
+            th.daemon = True
+            th.start()
+
+        def gather_results(res_q, results):
+            while not res_q.empty():
+                val = res_q.get()
+
+                if isinstance(val, Exception):
+                    msg = "Exception during test execution: {0}"
+                    raise ValueError(msg.format(val.message))
+
+                results.append(val)
+
+        results = []
+
+        while True:
+            for th in threads:
+                th.join(1)
+                gather_results(res_q, results)
+
+            if all(not th.is_alive() for th in threads):
+                break
+
+        gather_results(res_q, results)
+        yield name, test.merge_results(results)
+
+
+def log_nodes_statistic(_, ctx):
+    nodes = ctx.nodes
+    logger.info("Found {0} nodes total".format(len(nodes)))
+    per_role = collections.defaultdict(lambda: 0)
+    for node in nodes:
+        for role in node.roles:
+            per_role[role] += 1
+
+    for role, count in sorted(per_role.items()):
+        logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def connect_stage(cfg, ctx):
+    ctx.clear_calls_stack.append(disconnect_stage)
+    connect_all(ctx.nodes)
+
+
+def make_undiscover_stage(clean_data):
+    def undiscover_stage(cfg, ctx):
+        undiscover(clean_data)
+    return undiscover_stage
+
+
+def discover_stage(cfg, ctx):
+    if cfg.get('discover') is not None:
+        discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+
+        nodes, clean_data = discover(ctx, discover_objs,
+                                     cfg['clouds'], cfg['var_dir'])
+        ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+        ctx.nodes.extend(nodes)
+
+    for url, roles in cfg.get('explicit_nodes', {}).items():
+        ctx.nodes.append(Node(url, roles.split(",")))
+
+
+def deploy_sensors_stage(cfg_dict, ctx):
+    if 'sensors' not in cfg_dict:
+        return
+
+    cfg = cfg_dict.get('sensors')
+
+    sensors_configs = []
+    monitored_nodes = []
+
+    for role, sensors_str in cfg["roles_mapping"].items():
+        sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+        collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+        for node in ctx.nodes:
+            if role in node.roles:
+                monitored_nodes.append(node)
+                sens_cfg = SensorConfig(node.connection,
+                                        node.get_ip(),
+                                        collect_cfg)
+                sensors_configs.append(sens_cfg)
+
+    if len(monitored_nodes) == 0:
+        logger.info("Nothing to monitor, no sensors would be installed")
+        return
+
+    ctx.receiver_uri = cfg["receiver_uri"]
+    nodes_ips = [node.get_ip() for node in monitored_nodes]
+    if '{ip}' in ctx.receiver_uri:
+        ips = set(map(utils.get_ip_for_target, nodes_ips))
+
+        if len(ips) > 1:
+            raise ValueError("Can't select external ip for sensors server")
+
+        if len(ips) == 0:
+            raise ValueError("Can't find any external ip for sensors server")
+
+        ext_ip = list(ips)[0]
+        ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
+
+    ctx.clear_calls_stack.append(remove_sensors_stage)
+    ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
+
+    ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+
+    mon_q = Queue.Queue()
+
+    fd = open(cfg_dict['sensor_storage'], "w")
+    th = threading.Thread(None, save_sensors_data, None,
+                          (ctx.sensors_control_queue, mon_q, fd))
+    th.daemon = True
+    th.start()
+    ctx.sensor_listen_thread = th
+
+    nodes_ips_set = set(nodes_ips)
+    MAX_WAIT_FOR_SENSORS = 10
+    etime = time.time() + MAX_WAIT_FOR_SENSORS
+
+    msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
+    logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
+
+    # wait till all nodes start sending data
+    while len(nodes_ips_set) != 0:
+        tleft = etime - time.time()
+        try:
+            data = mon_q.get(True, tleft)
+            ip, port = data
+        except Queue.Empty:
+            msg = "Node {0} not sending any sensor data in {1}s"
+            msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
+            raise RuntimeError(msg)
+
+        if ip not in nodes_ips_set:
+            logger.warning("Receive sensors from extra node: {0}".format(ip))
+
+        nodes_ips_set.remove(ip)
+
+
+def remove_sensors_stage(cfg, ctx):
+    if ctx.sensor_cm is not None:
+        ctx.sensor_cm.__exit__(None, None, None)
+
+        if ctx.sensors_control_queue is not None:
+            ctx.sensors_control_queue.put(None)
+
+        if ctx.sensor_listen_thread is not None:
+            ctx.sensor_listen_thread.join()
+
+
+def get_os_credentials(cfg, ctx, creds_type):
+    creds = None
+
+    if creds_type == 'clouds':
+        if 'openstack' in cfg['clouds']:
+            os_cfg = cfg['clouds']['openstack']
+
+            tenant = os_cfg['OS_TENANT_NAME'].strip()
+            user = os_cfg['OS_USERNAME'].strip()
+            passwd = os_cfg['OS_PASSWORD'].strip()
+            auth_url = os_cfg['OS_AUTH_URL'].strip()
+
+        elif 'fuel' in cfg['clouds'] and \
+             'openstack_env' in cfg['clouds']['fuel']:
+            creds = ctx.fuel_openstack_creds
+
+    elif creds_type == 'ENV':
+        user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+    elif os.path.isfile(creds_type):
+        raise NotImplementedError()
+        # user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+    else:
+        msg = "Creds {0!r} isn't supported".format(creds_type)
+        raise ValueError(msg)
+
+    if creds is None:
+        creds = {'name': user,
+                 'passwd': passwd,
+                 'tenant': tenant,
+                 'auth_url': auth_url}
+
+    return creds
+
+
+def run_tests_stage(cfg, ctx):
+    ctx.results = []
+
+    if 'tests' not in cfg:
+        return
+
+    for group in cfg['tests']:
+
+        assert len(group.items()) == 1
+        key, config = group.items()[0]
+
+        if 'start_test_nodes' == key:
+            params = config['vm_params'].copy()
+            os_nodes_ids = []
+
+            os_creds_type = config['creds']
+            os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+
+            start_vms.nova_connect(**os_creds)
+
+            logger.info("Preparing openstack")
+            start_vms.prepare_os_subpr(**os_creds)
+
+            new_nodes = []
+            try:
+                params['group_name'] = cfg_dict['run_uuid']
+                for new_node, node_id in start_vms.launch_vms(params):
+                    new_node.roles.append('testnode')
+                    ctx.nodes.append(new_node)
+                    os_nodes_ids.append(node_id)
+                    new_nodes.append(new_node)
+
+                store_nodes_in_log(cfg, os_nodes_ids)
+                ctx.openstack_nodes_ids = os_nodes_ids
+
+                connect_all(new_nodes)
+
+                # deploy sensors on new nodes
+                # unify this code
+                if 'sensors' in cfg:
+                    sens_cfg = []
+                    sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
+                    sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+                    collect_cfg = dict((sensor, {}) for sensor in sensors)
+                    for node in new_nodes:
+                        sens_cfg.append((node.connection, collect_cfg))
+
+                    uri = cfg["sensors"]["receiver_uri"]
+                    logger.debug("Installing sensors on vm's")
+                    deploy_and_start_sensors(uri, None,
+                                             connected_config=sens_cfg)
+
+                for test_group in config.get('tests', []):
+                    ctx.results.extend(run_tests(test_group, ctx.nodes))
+
+            finally:
+                if not ctx.keep_vm:
+                    shut_down_vms_stage(cfg, ctx)
+
+        else:
+            ctx.results.extend(run_tests(group, ctx.nodes))
+
+
+def shut_down_vms_stage(cfg, ctx):
+    vm_ids_fname = cfg_dict['vm_ids_fname']
+    if ctx.openstack_nodes_ids is None:
+        nodes_ids = open(vm_ids_fname).read().split()
+    else:
+        nodes_ids = ctx.openstack_nodes_ids
+
+    if len(nodes_ids) != 0:
+        logger.info("Removing nodes")
+        start_vms.clear_nodes(nodes_ids)
+        logger.info("Nodes has been removed")
+
+    if os.path.exists(vm_ids_fname):
+        os.remove(vm_ids_fname)
+
+
+def store_nodes_in_log(cfg, nodes_ids):
+    with open(cfg['vm_ids_fname'], 'w') as fd:
+        fd.write("\n".join(nodes_ids))
+
+
+def clear_enviroment(cfg, ctx):
+    if os.path.exists(cfg_dict['vm_ids_fname']):
+        shut_down_vms_stage(cfg, ctx)
+
+
+def disconnect_stage(cfg, ctx):
+    ssh_utils.close_all_sessions()
+
+    for node in ctx.nodes:
+        if node.connection is not None:
+            node.connection.close()
+
+
+def yamable(data):
+    if isinstance(data, (tuple, list)):
+        return map(yamable, data)
+
+    if isinstance(data, unicode):
+        return str(data)
+
+    if isinstance(data, dict):
+        res = {}
+        for k, v in data.items():
+            res[yamable(k)] = yamable(v)
+        return res
+
+    return data
+
+
+def store_raw_results_stage(cfg, ctx):
+
+    raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
+
+    if os.path.exists(raw_results):
+        cont = yaml.load(open(raw_results).read())
+    else:
+        cont = []
+
+    cont.extend(yamable(ctx.results))
+    raw_data = pretty_yaml.dumps(cont)
+
+    with open(raw_results, "w") as fd:
+        fd.write(raw_data)
+
+
+def console_report_stage(cfg, ctx):
+    for tp, data in ctx.results:
+        if 'io' == tp and data is not None:
+            print(IOPerfTest.format_for_console(data))
+
+
+def report_stage(cfg, ctx):
+    html_rep_fname = cfg['html_report_file']
+
+    try:
+        fuel_url = cfg['clouds']['fuel']['url']
+    except KeyError:
+        fuel_url = None
+
+    try:
+        creds = cfg['clouds']['fuel']['creds']
+    except KeyError:
+        creds = None
+
+    report.make_io_report(ctx.results, html_rep_fname, fuel_url, creds=creds)
+
+    logger.info("Html report were stored in " + html_rep_fname)
+
+    text_rep_fname = cfg_dict['text_report_file']
+    with open(text_rep_fname, "w") as fd:
+        for tp, data in ctx.results:
+            if 'io' == tp and data is not None:
+                fd.write(IOPerfTest.format_for_console(data))
+                fd.write("\n")
+                fd.flush()
+
+    logger.info("Text report were stored in " + text_rep_fname)
+
+
+def complete_log_nodes_statistic(cfg, ctx):
+    nodes = ctx.nodes
+    for node in nodes:
+        logger.debug(str(node))
+
+
+def load_data_from(var_dir):
+    def load_data_from_file(cfg, ctx):
+        raw_results = os.path.join(var_dir, 'raw_results.yaml')
+        ctx.results = yaml.load(open(raw_results).read())
+    return load_data_from_file
+
+
+def parse_args(argv):
+    descr = "Disk io performance test suite"
+    parser = argparse.ArgumentParser(prog='wally', description=descr)
+
+    parser.add_argument("-l", dest='extra_logs',
+                        action='store_true', default=False,
+                        help="print some extra log info")
+    parser.add_argument("-b", '--build_description',
+                        type=str, default="Build info")
+    parser.add_argument("-i", '--build_id', type=str, default="id")
+    parser.add_argument("-t", '--build_type', type=str, default="GA")
+    parser.add_argument("-u", '--username', type=str, default="admin")
+    parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
+                        help="Only process data from previour run")
+    parser.add_argument("-k", '--keep-vm', action='store_true',
+                        help="Don't remove test vm's", default=False)
+    parser.add_argument("config_file")
+
+    return parser.parse_args(argv[1:])
+
+
+def main(argv):
+    opts = parse_args(argv)
+
+    if opts.post_process_only is not None:
+        stages = [
+            load_data_from(opts.post_process_only),
+            console_report_stage,
+            report_stage
+        ]
+    else:
+        stages = [
+            discover_stage,
+            log_nodes_statistic,
+            connect_stage,
+            deploy_sensors_stage,
+            run_tests_stage,
+            store_raw_results_stage,
+            console_report_stage,
+            report_stage
+        ]
+
+    load_config(opts.config_file, opts.post_process_only)
+
+    if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
+        level = logging.DEBUG
+    else:
+        level = logging.WARNING
+
+    setup_loggers(level, cfg_dict['log_file'])
+
+    logger.info("All info would be stored into {0}".format(
+        cfg_dict['var_dir']))
+
+    ctx = Context()
+    ctx.build_meta['build_id'] = opts.build_id
+    ctx.build_meta['build_descrption'] = opts.build_description
+    ctx.build_meta['build_type'] = opts.build_type
+    ctx.build_meta['username'] = opts.username
+    ctx.keep_vm = opts.keep_vm
+
+    try:
+        for stage in stages:
+            logger.info("Start {0.__name__} stage".format(stage))
+            stage(cfg_dict, ctx)
+    except Exception as exc:
+        msg = "Exception during current stage: {0}".format(exc.message)
+        logger.error(msg)
+    finally:
+        exc, cls, tb = sys.exc_info()
+        for stage in ctx.clear_calls_stack[::-1]:
+            try:
+                logger.info("Start {0.__name__} stage".format(stage))
+                stage(cfg_dict, ctx)
+            except Exception as exc:
+                logger.exception("During {0.__name__} stage".format(stage))
+
+        if exc is not None:
+            raise exc, cls, tb
+
+    logger.info("All info stored into {0}".format(cfg_dict['var_dir']))
+    return 0
diff --git a/wally/sensors/__init__.py b/wally/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/__init__.py
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
new file mode 100644
index 0000000..f66bb36
--- /dev/null
+++ b/wally/sensors/api.py
@@ -0,0 +1,58 @@
+import Queue
+import threading
+from contextlib import contextmanager
+
+from .deploy_sensors import (deploy_and_start_sensors,
+                             stop_and_remove_sensors)
+from .protocol import create_protocol, Timeout
+
+
+__all__ = ['Empty', 'recv_main', 'start_monitoring',
+           'deploy_and_start_sensors', 'SensorConfig']
+
+
+Empty = Queue.Empty
+
+
+class SensorConfig(object):
+    def __init__(self, conn, url, sensors):
+        self.conn = conn
+        self.url = url
+        self.sensors = sensors
+
+
+def recv_main(proto, data_q, cmd_q):
+    while True:
+        try:
+            data_q.put(proto.recv(0.1))
+        except Timeout:
+            pass
+
+        try:
+            val = cmd_q.get(False)
+
+            if val is None:
+                return
+
+        except Queue.Empty:
+            pass
+
+
+@contextmanager
+def start_monitoring(uri, configs):
+    deploy_and_start_sensors(uri, configs)
+    try:
+        data_q = Queue.Queue()
+        cmd_q = Queue.Queue()
+        proto = create_protocol(uri, receiver=True)
+        th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+        th.daemon = True
+        th.start()
+
+        try:
+            yield data_q
+        finally:
+            cmd_q.put(None)
+            th.join()
+    finally:
+        stop_and_remove_sensors(configs)
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
new file mode 100644
index 0000000..4e96afe
--- /dev/null
+++ b/wally/sensors/cp_protocol.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+""" Protocol class """
+
+import re
+import zlib
+import json
+import logging
+import binascii
+
+
+logger = logging.getLogger("wally")
+
+
+# protocol contains 2 type of packet:
+# 1 - header, which contains template schema of counters
+# 2 - body, which contains only values in order as in template
+#       it uses msgpack (or provided packer) for optimization
+#
+# packet has format:
+# begin_data_prefixSIZE\n\nDATAend_data_postfix
+# packet part has format:
+# SIZE\n\rDATA
+#
+# DATA use archivation
+
+
+class PacketException(Exception):
+    """ Exceptions from Packet"""
+    pass
+
+
+class Packet(object):
+    """ Class proceed packet by protocol"""
+
+    prefix = "begin_data_prefix"
+    postfix = "end_data_postfix"
+    header_prefix = "template"
+    # other fields
+    # is_begin
+    # is_end
+    # crc
+    # data
+    # data_len
+
+    def __init__(self, packer):
+        # preinit
+        self.is_begin = False
+        self.is_end = False
+        self.crc = None
+        self.data = ""
+        self.data_len = None
+        self.srv_template = None
+        self.clt_template = None
+        self.tmpl_size = 0
+        self.packer = packer
+
+    def new_packet(self, part):
+        """ New packet adding """
+        # proceed packet
+        try:
+            # get size
+            local_size_s, _, part = part.partition("\n\r")
+            local_size = int(local_size_s)
+
+            # find prefix
+            begin = part.find(self.prefix)
+            if begin != -1:
+                # divide data if something before begin and prefix
+                from_i = begin + len(self.prefix)
+                part = part[from_i:]
+                # reset flags
+                self.is_begin = True
+                self.is_end = False
+                self.data = ""
+                # get size
+                data_len_s, _, part = part.partition("\n\r")
+                self.data_len = int(data_len_s)
+                # get crc
+                crc_s, _, part = part.partition("\n\r")
+                self.crc = int(crc_s)
+
+            # bad size?
+            if local_size != self.data_len:
+                raise PacketException("Part size error")
+
+            # find postfix
+            end = part.find(self.postfix)
+            if end != -1:
+                # divide postfix
+                part = part[:end]
+                self.is_end = True
+
+            self.data += part
+            # check if it is end
+            if self.is_end:
+                self.data = zlib.decompress(self.data)
+                if self.data_len != len(self.data):
+                    raise PacketException("Total size error")
+                if binascii.crc32(self.data) != self.crc:
+                    raise PacketException("CRC error")
+
+                # check, if it is template
+                if self.data.startswith(self.header_prefix):
+                    self.srv_template = self.data
+                    # template is for internal use
+                    return None
+
+                # decode values list
+                vals = self.packer.unpack(self.data)
+                dump = self.srv_template % tuple(vals)
+                return dump
+            else:
+                return None
+
+        except PacketException as e:
+            # if something wrong - skip packet
+            logger.warning("Packet skipped: %s", e)
+            self.is_begin = False
+            self.is_end = False
+            return None
+
+        except TypeError:
+            # if something wrong - skip packet
+            logger.warning("Packet skipped: doesn't match schema")
+            self.is_begin = False
+            self.is_end = False
+            return None
+
+        except:
+            # if something at all wrong - skip packet
+            logger.warning("Packet skipped: something is wrong")
+            self.is_begin = False
+            self.is_end = False
+            return None
+
+    @staticmethod
+    def create_packet(data, part_size):
+        """ Create packet divided by parts with part_size from data
+            No compression here """
+        # prepare data
+        data_len = "%i\n\r" % len(data)
+        header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
+        compact_data = zlib.compress(data)
+        packet = "%s%s%s" % (header, compact_data, Packet.postfix)
+
+        partheader_len = len(data_len)
+
+        beg = 0
+        end = part_size - partheader_len
+
+        result = []
+        while beg < len(packet):
+            block = packet[beg:beg+end]
+            result.append(data_len + block)
+            beg += end
+
+        return result
+
+    def create_packet_v2(self, data, part_size):
+        """ Create packet divided by parts with part_size from data
+            Compressed """
+        result = []
+        # create and add to result template header
+        if self.srv_template is None:
+            perf_string = json.dumps(data)
+            self.create_answer_template(perf_string)
+            template = self.header_prefix + self.srv_template
+            header = Packet.create_packet(template, part_size)
+            result.extend(header)
+
+        vals = self.get_matching_value_list(data)
+        body = self.packer.pack(vals)
+        parts = Packet.create_packet(body, part_size)
+        result.extend(parts)
+        return result
+
+    def get_matching_value_list(self, data):
+        """ Get values in order server expect"""
+        vals = range(0, self.tmpl_size)
+
+        try:
+            for node, groups in self.clt_template.items():
+                for group, counters in groups.items():
+                    for counter, index in counters.items():
+                        if not isinstance(index, dict):
+                            vals[index] = data[node][group][counter]
+                        else:
+                            for k, i in index.items():
+                                vals[i] = data[node][group][counter][k]
+
+            return vals
+
+        except (IndexError, KeyError):
+            logger = logging.getLogger(__name__)
+            logger.error("Data don't match last schema")
+            raise PacketException("Data don't match last schema")
+
+    def create_answer_template(self, perf_string):
+        """ Create template for server to insert counter values
+            Return tuple of server and clien templates + number of replaces"""
+        replacer = re.compile(": [0-9]+\.?[0-9]*")
+        # replace all values by %s
+        finditer = replacer.finditer(perf_string)
+        # server not need know positions
+        self.srv_template = ""
+        # client need positions
+        clt_template = ""
+        beg = 0
+        k = 0
+        # this could be done better?
+        for match in finditer:
+            # define input place in server template
+            self.srv_template += perf_string[beg:match.start()]
+            self.srv_template += ": %s"
+            # define match number in client template
+            clt_template += perf_string[beg:match.start()]
+            clt_template += ": %i" % k
+
+            beg = match.end()
+            k += 1
+
+        # add tail
+        self.srv_template += perf_string[beg:]
+        clt_template += perf_string[beg:]
+
+        self.tmpl_size = k
+        self.clt_template = json.loads(clt_template)
diff --git a/wally/sensors/cp_transport.py b/wally/sensors/cp_transport.py
new file mode 100644
index 0000000..2e00e80
--- /dev/null
+++ b/wally/sensors/cp_transport.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+""" UDP sender class """
+
+import socket
+import urlparse
+
+from cp_protocol import Packet
+
+try:
+    from disk_perf_test_tool.logger import define_logger
+    logger = define_logger(__name__)
+except ImportError:
+    class Logger(object):
+        def debug(self, *dt):
+            pass
+    logger = Logger()
+
+
+class SenderException(Exception):
+    """ Exceptions in Sender class """
+    pass
+
+
+class Timeout(Exception):
+    """ Exceptions in Sender class """
+    pass
+
+
+class Sender(object):
+    """ UDP sender class """
+
+    def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
+        """ Create connection object from input udp string or params"""
+
+        # test input
+        if url is None and port is None:
+            raise SenderException("Bad initialization")
+        if url is not None:
+            data = urlparse.urlparse(url)
+            # check schema
+            if data.scheme != "udp":
+                mes = "Bad protocol type: %s instead of UDP" % data.scheme
+                logger.error(mes)
+                raise SenderException("Bad protocol type")
+            # try to get port
+            try:
+                int_port = int(data.port)
+            except ValueError:
+                logger.error("Bad UDP port")
+                raise SenderException("Bad UDP port")
+            # save paths
+            self.sendto = (data.hostname, int_port)
+            self.bindto = (data.hostname, int_port)
+            # try to get size
+            try:
+                self.size = int(data.path.strip("/"))
+            except ValueError:
+                logger.error("Bad packet part size")
+                raise SenderException("Bad packet part size")
+        else:
+            # url is None - use size and port
+            self.sendto = (host, port)
+            self.bindto = ("0.0.0.0", port)
+            self.size = size
+
+        self.packer = packer
+
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        self.binded = False
+        self.all_data = {}
+        self.send_packer = None
+
+    def bind(self):
+        """ Prepare for listening """
+        self.sock.bind(self.bindto)
+        self.sock.settimeout(0.5)
+        self.binded = True
+
+    def send(self, data):
+        """ Send data to udp socket"""
+        if self.sock.sendto(data, self.sendto) != len(data):
+            mes = "Cannot send data to %s:%s" % self.sendto
+            logger.error(mes)
+            raise SenderException("Cannot send data")
+
+    def send_by_protocol(self, data):
+        """ Send data by Packet protocol
+            data = dict"""
+        if self.send_packer is None:
+            self.send_packer = Packet(self.packer())
+        parts = self.send_packer.create_packet_v2(data, self.size)
+        for part in parts:
+            self.send(part)
+
+    def recv(self):
+        """ Receive data from udp socket"""
+        # check for binding
+        if not self.binded:
+            self.bind()
+        # try to recv
+        try:
+            data, (remote_ip, _) = self.sock.recvfrom(self.size)
+            return data, remote_ip
+        except socket.timeout:
+            raise Timeout()
+
+    def recv_by_protocol(self):
+        """ Receive data from udp socket by Packet protocol"""
+        data, remote_ip = self.recv()
+
+        if remote_ip not in self.all_data:
+            self.all_data[remote_ip] = Packet(self.packer())
+
+        return self.all_data[remote_ip].new_packet(data)
+
+    def recv_with_answer(self, stop_event=None):
+        """ Receive data from udp socket and send 'ok' back
+            Command port = local port + 1
+            Answer port = local port
+            Waiting for command is blocking """
+        # create command socket
+        command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        command_port = self.bindto[1]+1
+        command_sock.bind(("0.0.0.0", command_port))
+        command_sock.settimeout(1)
+        # try to recv
+        while True:
+            try:
+                data, (remote_ip, _) = command_sock.recvfrom(self.size)
+                self.send("ok")
+                return data, remote_ip
+            except socket.timeout:
+                if stop_event is not None and stop_event.is_set():
+                    # return None if we are interrupted
+                    return None
+
+    def verified_send(self, send_host, message, max_repeat=20):
+        """ Send and verify it by answer not more then max_repeat
+            Send port = local port + 1
+            Answer port = local port
+            Return True if send is verified """
+        # create send socket
+        send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        send_port = self.sendto[1]+1
+        for repeat in range(0, max_repeat):
+            send_sock.sendto(message, (send_host, send_port))
+            try:
+                data, remote_ip = self.recv()
+                if remote_ip == send_host and data == "ok":
+                    return True
+                else:
+                    logger.warning("No answer from %s, try %i",
+                                   send_host, repeat)
+            except Timeout:
+                logger.warning("No answer from %s, try %i", send_host, repeat)
+
+        return False
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
new file mode 100644
index 0000000..a4fa157
--- /dev/null
+++ b/wally/sensors/daemonize.py
@@ -0,0 +1,226 @@
+# #!/usr/bin/python
+
+import fcntl
+import os
+import pwd
+import grp
+import sys
+import signal
+import resource
+import logging
+import atexit
+from logging import handlers
+
+
+class Daemonize(object):
+    """ Daemonize object
+    Object constructor expects three arguments:
+    - app: contains the application name which will be sent to syslog.
+    - pid: path to the pidfile.
+    - action: your custom function which will be executed after daemonization.
+    - keep_fds: optional list of fds which should not be closed.
+    - auto_close_fds: optional parameter to not close opened fds.
+    - privileged_action: action that will be executed before
+                         drop privileges if user or
+                         group parameter is provided.
+                         If you want to transfer anything from privileged
+                         action to action, such as opened privileged file
+                         descriptor, you should return it from
+                         privileged_action function and catch it inside action
+                         function.
+    - user: drop privileges to this user if provided.
+    - group: drop privileges to this group if provided.
+    - verbose: send debug messages to logger if provided.
+    - logger: use this logger object instead of creating new one, if provided.
+    """
+    def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
+                 privileged_action=None, user=None, group=None, verbose=False,
+                 logger=None):
+        self.app = app
+        self.pid = pid
+        self.action = action
+        self.keep_fds = keep_fds or []
+        self.privileged_action = privileged_action or (lambda: ())
+        self.user = user
+        self.group = group
+        self.logger = logger
+        self.verbose = verbose
+        self.auto_close_fds = auto_close_fds
+
+    def sigterm(self, signum, frame):
+        """ sigterm method
+        These actions will be done after SIGTERM.
+        """
+        self.logger.warn("Caught signal %s. Stopping daemon." % signum)
+        os.remove(self.pid)
+        sys.exit(0)
+
+    def exit(self):
+        """ exit method
+        Cleanup pid file at exit.
+        """
+        self.logger.warn("Stopping daemon.")
+        os.remove(self.pid)
+        sys.exit(0)
+
+    def start(self):
+        """ start method
+        Main daemonization process.
+        """
+        # If pidfile already exists, we should read pid from there;
+        # to overwrite it, if locking
+        # will fail, because locking attempt somehow purges the file contents.
+        if os.path.isfile(self.pid):
+            with open(self.pid, "r") as old_pidfile:
+                old_pid = old_pidfile.read()
+        # Create a lockfile so that only one instance of this daemon is
+        # running at any time.
+        try:
+            lockfile = open(self.pid, "w")
+        except IOError:
+            print("Unable to create the pidfile.")
+            sys.exit(1)
+        try:
+            # Try to get an exclusive lock on the file. This will fail if
+            # another process has the file
+            # locked.
+            fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            print("Unable to lock on the pidfile.")
+            # We need to overwrite the pidfile if we got here.
+            with open(self.pid, "w") as pidfile:
+                pidfile.write(old_pid)
+            sys.exit(1)
+
+        # Fork, creating a new process for the child.
+        process_id = os.fork()
+        if process_id < 0:
+            # Fork error. Exit badly.
+            sys.exit(1)
+        elif process_id != 0:
+            # This is the parent process. Exit.
+            sys.exit(0)
+        # This is the child process. Continue.
+
+        # Stop listening for signals that the parent process receives.
+        # This is done by getting a new process id.
+        # setpgrp() is an alternative to setsid().
+        # setsid puts the process in a new parent group and detaches
+        # its controlling terminal.
+        process_id = os.setsid()
+        if process_id == -1:
+            # Uh oh, there was a problem.
+            sys.exit(1)
+
+        # Add lockfile to self.keep_fds.
+        self.keep_fds.append(lockfile.fileno())
+
+        # Close all file descriptors, except the ones mentioned in
+        # self.keep_fds.
+        devnull = "/dev/null"
+        if hasattr(os, "devnull"):
+            # Python has set os.devnull on this system, use it instead as it
+            # might be different
+            # than /dev/null.
+            devnull = os.devnull
+
+        if self.auto_close_fds:
+            for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
+                if fd not in self.keep_fds:
+                    try:
+                        os.close(fd)
+                    except OSError:
+                        pass
+
+        devnull_fd = os.open(devnull, os.O_RDWR)
+        os.dup2(devnull_fd, 0)
+        os.dup2(devnull_fd, 1)
+        os.dup2(devnull_fd, 2)
+
+        if self.logger is None:
+            # Initialize logging.
+            self.logger = logging.getLogger(self.app)
+            self.logger.setLevel(logging.DEBUG)
+            # Display log messages only on defined handlers.
+            self.logger.propagate = False
+
+            # Initialize syslog.
+            # It will correctly work on OS X, Linux and FreeBSD.
+            if sys.platform == "darwin":
+                syslog_address = "/var/run/syslog"
+            else:
+                syslog_address = "/dev/log"
+
+            # We will continue with syslog initialization only if
+            # actually have such capabilities
+            # on the machine we are running this.
+            if os.path.isfile(syslog_address):
+                syslog = handlers.SysLogHandler(syslog_address)
+                if self.verbose:
+                    syslog.setLevel(logging.DEBUG)
+                else:
+                    syslog.setLevel(logging.INFO)
+                # Try to mimic to normal syslog messages.
+                format_t = "%(asctime)s %(name)s: %(message)s"
+                formatter = logging.Formatter(format_t,
+                                              "%b %e %H:%M:%S")
+                syslog.setFormatter(formatter)
+
+                self.logger.addHandler(syslog)
+
+        # Set umask to default to safe file permissions when running
+        # as a root daemon. 027 is an
+        # octal number which we are typing as 0o27 for Python3 compatibility.
+        os.umask(0o27)
+
+        # Change to a known directory. If this isn't done, starting a daemon
+        # in a subdirectory that
+        # needs to be deleted results in "directory busy" errors.
+        os.chdir("/")
+
+        # Execute privileged action
+        privileged_action_result = self.privileged_action()
+        if not privileged_action_result:
+            privileged_action_result = []
+
+        # Change gid
+        if self.group:
+            try:
+                gid = grp.getgrnam(self.group).gr_gid
+            except KeyError:
+                self.logger.error("Group {0} not found".format(self.group))
+                sys.exit(1)
+            try:
+                os.setgid(gid)
+            except OSError:
+                self.logger.error("Unable to change gid.")
+                sys.exit(1)
+
+        # Change uid
+        if self.user:
+            try:
+                uid = pwd.getpwnam(self.user).pw_uid
+            except KeyError:
+                self.logger.error("User {0} not found.".format(self.user))
+                sys.exit(1)
+            try:
+                os.setuid(uid)
+            except OSError:
+                self.logger.error("Unable to change uid.")
+                sys.exit(1)
+
+        try:
+            lockfile.write("%s" % (os.getpid()))
+            lockfile.flush()
+        except IOError:
+            self.logger.error("Unable to write pid to the pidfile.")
+            print("Unable to write pid to the pidfile.")
+            sys.exit(1)
+
+        # Set custom action on SIGTERM.
+        signal.signal(signal.SIGTERM, self.sigterm)
+        atexit.register(self.exit)
+
+        self.logger.warn("Starting daemon.")
+
+        self.action(*privileged_action_result)
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
new file mode 100644
index 0000000..249adfb
--- /dev/null
+++ b/wally/sensors/deploy_sensors.py
@@ -0,0 +1,87 @@
+import time
+import json
+import os.path
+import logging
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from wally.ssh_utils import copy_paths, run_over_ssh
+
+logger = logging.getLogger('wally')
+
+
+def wait_all_ok(futures):
+    return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, sensor_configs,
+                             remote_path='/tmp/sensors/sensors'):
+
+    paths = {os.path.dirname(__file__): remote_path}
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        futures = []
+
+        for node_sensor_config in sensor_configs:
+            futures.append(executor.submit(deploy_and_start_sensor,
+                                           paths,
+                                           node_sensor_config,
+                                           monitor_uri,
+                                           remote_path))
+
+        if not wait_all_ok(futures):
+            raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, node_sensor_config,
+                            monitor_uri, remote_path):
+    try:
+        copy_paths(node_sensor_config.conn, paths)
+        sftp = node_sensor_config.conn.open_sftp()
+
+        config_remote_path = os.path.join(remote_path, "conf.json")
+
+        with sftp.open(config_remote_path, "w") as fd:
+            fd.write(json.dumps(node_sensor_config.sensors))
+
+        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+                    "sensors.main -d start -u {1} {2}"
+
+        cmd = cmd_templ.format(os.path.dirname(remote_path),
+                               monitor_uri,
+                               config_remote_path)
+
+        run_over_ssh(node_sensor_config.conn, cmd,
+                     node=node_sensor_config.url)
+        sftp.close()
+
+    except:
+        msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+        logger.exception(msg)
+        return False
+    return True
+
+
+def stop_and_remove_sensor(conn, url, remote_path):
+    cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+
+    run_over_ssh(conn, cmd.format(remote_path), node=url)
+
+    # some magic
+    time.sleep(0.3)
+
+    conn.exec_command("rm -rf {0}".format(remote_path))
+
+    logger.debug("Sensors stopped and removed")
+
+
+def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        futures = []
+
+        for node_sensor_config in configs:
+            futures.append(executor.submit(stop_and_remove_sensor,
+                                           node_sensor_config.conn,
+                                           node_sensor_config.url,
+                                           remote_path))
+
+        wait(futures)
diff --git a/wally/sensors/discover.py b/wally/sensors/discover.py
new file mode 100644
index 0000000..f227043
--- /dev/null
+++ b/wally/sensors/discover.py
@@ -0,0 +1,9 @@
+all_sensors = {}
+
+
+def provides(sensor_class_name):
+    def closure(func):
+        assert sensor_class_name not in all_sensors
+        all_sensors[sensor_class_name] = func
+        return func
+    return closure
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
new file mode 100644
index 0000000..3753e7c
--- /dev/null
+++ b/wally/sensors/main.py
@@ -0,0 +1,118 @@
+import os
+import sys
+import time
+import json
+import glob
+import signal
+import os.path
+import argparse
+
+from .sensors.utils import SensorInfo
+from .daemonize import Daemonize
+from .discover import all_sensors
+from .protocol import create_protocol
+
+
+# load all sensors
+from . import sensors
+sensors_dir = os.path.dirname(sensors.__file__)
+for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
+    mod_name = os.path.basename(fname[:-3])
+    __import__("sensors.sensors." + mod_name)
+
+
+def get_values(required_sensors):
+    result = {}
+    for sensor_name, params in required_sensors:
+        if sensor_name in all_sensors:
+            result.update(all_sensors[sensor_name](**params))
+        else:
+            msg = "Sensor {0!r} isn't available".format(sensor_name)
+            raise ValueError(msg)
+    return time.time(), result
+
+
+def parse_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-d', '--daemon',
+                        choices=('start', 'stop', 'status'),
+                        default=None)
+
+    parser.add_argument('-u', '--url', default='stdout://')
+    parser.add_argument('-t', '--timeout', type=float, default=1)
+    parser.add_argument('-l', '--list-sensors', action='store_true')
+    parser.add_argument('sensors_config', type=argparse.FileType('r'),
+                        default=None, nargs='?')
+    return parser.parse_args(args[1:])
+
+
+def daemon_main(required_sensors, opts):
+    sender = create_protocol(opts.url)
+    prev = {}
+
+    while True:
+        gtime, data = get_values(required_sensors.items())
+        curr = {'time': SensorInfo(gtime, True)}
+        for name, val in data.items():
+            if val.is_accumulated:
+                if name in prev:
+                    curr[name] = SensorInfo(val.value - prev[name], True)
+                prev[name] = val.value
+            else:
+                curr[name] = SensorInfo(val.value, False)
+        sender.send(curr)
+        time.sleep(opts.timeout)
+
+
+def pid_running(pid):
+    return os.path.exists("/proc/" + str(pid))
+
+
+def main(argv):
+    opts = parse_args(argv)
+
+    if opts.list_sensors:
+        print "\n".join(sorted(all_sensors))
+        return 0
+
+    if opts.daemon is not None:
+        pid_file = "/tmp/sensors.pid"
+        if opts.daemon == 'start':
+            required_sensors = json.loads(opts.sensors_config.read())
+
+            def root_func():
+                daemon_main(required_sensors, opts)
+
+            daemon = Daemonize(app="perfcollect_app",
+                               pid=pid_file,
+                               action=root_func)
+            daemon.start()
+        elif opts.daemon == 'stop':
+            if os.path.isfile(pid_file):
+                pid = int(open(pid_file).read())
+                if pid_running(pid):
+                    os.kill(pid, signal.SIGTERM)
+
+                time.sleep(0.1)
+
+                if pid_running(pid):
+                    os.kill(pid, signal.SIGKILL)
+
+                if os.path.isfile(pid_file):
+                    os.unlink(pid_file)
+        elif opts.daemon == 'status':
+            if os.path.isfile(pid_file):
+                pid = int(open(pid_file).read())
+                if pid_running(pid):
+                    print "running"
+                    return
+            print "stopped"
+        else:
+            raise ValueError("Unknown daemon operation {}".format(opts.daemon))
+    else:
+        required_sensors = json.loads(opts.sensors_config.read())
+        daemon_main(required_sensors, opts)
+    return 0
+
+if __name__ == "__main__":
+    exit(main(sys.argv))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
new file mode 100644
index 0000000..c2ace01
--- /dev/null
+++ b/wally/sensors/protocol.py
@@ -0,0 +1,192 @@
+import sys
+import time
+import socket
+import select
+import cPickle as pickle
+from urlparse import urlparse
+
+from . import cp_transport
+
+
+class Timeout(Exception):
+    pass
+
+
+# ------------------------------------- Serializers --------------------------
+
+
+class ISensortResultsSerializer(object):
+    def pack(self, data):
+        pass
+
+    def unpack(self, data):
+        pass
+
+
+class PickleSerializer(ISensortResultsSerializer):
+    def pack(self, data):
+        ndata = {key: val.value for key, val in data.items()}
+        return pickle.dumps(ndata)
+
+    def unpack(self, data):
+        return pickle.loads(data)
+
+try:
+    # try to use full-function lib
+    import msgpack
+
+    class mgspackSerializer(ISensortResultsSerializer):
+        def pack(self, data):
+            return msgpack.packb(data)
+
+        def unpack(self, data):
+            return msgpack.unpackb(data)
+
+    MSGPackSerializer = mgspackSerializer
+except ImportError:
+    # use local lib, if failed import
+    import umsgpack
+
+    class umsgspackSerializer(ISensortResultsSerializer):
+        def pack(self, data):
+            return umsgpack.packb(data)
+
+        def unpack(self, data):
+            return umsgpack.unpackb(data)
+
+    MSGPackSerializer = umsgspackSerializer
+
+# ------------------------------------- Transports ---------------------------
+
+
+class ITransport(object):
+    def __init__(self, receiver):
+        pass
+
+    def send(self, data):
+        pass
+
+    def recv(self, timeout=None):
+        pass
+
+
+class StdoutTransport(ITransport):
+    MIN_COL_WIDTH = 10
+
+    def __init__(self, receiver, delta=True):
+        if receiver:
+            cname = self.__class__.__name__
+            raise ValueError("{0} don't allows receiving".format(cname))
+
+        self.headers = None
+        self.line_format = ""
+        self.prev = {}
+        self.delta = delta
+        self.fd = sys.stdout
+
+    def send(self, data):
+        if self.headers is None:
+            self.headers = sorted(data)
+
+            for pos, header in enumerate(self.headers):
+                self.line_format += "{%s:>%s}" % (pos,
+                                                  max(len(header) + 1,
+                                                      self.MIN_COL_WIDTH))
+
+            print self.line_format.format(*self.headers)
+
+        if self.delta:
+            vals = [data[header].value - self.prev.get(header, 0)
+                    for header in self.headers]
+
+            self.prev.update({header: data[header].value
+                              for header in self.headers})
+        else:
+            vals = [data[header].value for header in self.headers]
+
+        self.fd.write(self.line_format.format(*vals) + "\n")
+
+    def recv(self, timeout=None):
+        cname = self.__class__.__name__
+        raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+    def __init__(self, receiver, fname, delta=True):
+        StdoutTransport.__init__(self, receiver, delta)
+        self.fd = open(fname, "w")
+
+
+class UDPTransport(ITransport):
+    def __init__(self, receiver, ip, port, packer_cls):
+        self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        if receiver:
+            self.port.bind((ip, port))
+            self.packer_cls = packer_cls
+            self.packers = {}
+        else:
+            self.packer = packer_cls()
+            self.dst = (ip, port)
+
+    def send(self, data):
+        raw_data = self.packer.pack(data)
+        self.port.sendto(raw_data, self.dst)
+
+    def recv(self, timeout=None):
+        r, _, _ = select.select([self.port], [], [], timeout)
+        if len(r) != 0:
+            raw_data, addr = self.port.recvfrom(10000)
+            packer = self.packers.setdefault(addr, self.packer_cls())
+            return addr, packer.unpack(raw_data)
+        else:
+            raise Timeout()
+
+
+class HugeUDPTransport(ITransport, cp_transport.Sender):
+    def __init__(self, receiver, ip, port, packer_cls):
+        cp_transport.Sender.__init__(self, port=port, host=ip)
+        if receiver:
+            self.bind()
+
+    def send(self, data):
+        self.send_by_protocol(data)
+
+    def recv(self, timeout=None):
+        begin = time.time()
+
+        while True:
+
+            try:
+                # return not None, if packet is ready
+                ready = self.recv_by_protocol()
+                # if data ready - return it
+                if ready is not None:
+                    return ready
+                # if data not ready - check if it's time to die
+                if time.time() - begin >= timeout:
+                    break
+
+            except cp_transport.Timeout:
+                # no answer yet - check, if timeout end
+                if time.time() - begin >= timeout:
+                    break
+# -------------------------- Factory function --------------------------------
+
+
+def create_protocol(uri, receiver=False):
+    parsed_uri = urlparse(uri)
+    if parsed_uri.scheme == 'stdout':
+        return StdoutTransport(receiver)
+    elif parsed_uri.scheme == 'udp':
+        ip, port = parsed_uri.netloc.split(":")
+        return UDPTransport(receiver, ip=ip, port=int(port),
+                            packer_cls=PickleSerializer)
+    elif parsed_uri.scheme == 'file':
+        return FileTransport(receiver, parsed_uri.path)
+    elif parsed_uri.scheme == 'hugeudp':
+        ip, port = parsed_uri.netloc.split(":")
+        return HugeUDPTransport(receiver, ip=ip, port=int(port),
+                                packer_cls=MSGPackSerializer)
+    else:
+        templ = "Can't instantiate transport from {0!r}"
+        raise ValueError(templ.format(uri))
diff --git a/wally/sensors/receiver.py b/wally/sensors/receiver.py
new file mode 100644
index 0000000..ff0f223
--- /dev/null
+++ b/wally/sensors/receiver.py
@@ -0,0 +1,19 @@
+from .api import start_monitoring, Empty
+# from influx_exporter import connect, add_data
+
+uri = "udp://192.168.0.104:12001"
+# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+# conn = connect(infldb_url)
+
+monitor_config = {'127.0.0.1':
+                  {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
+                   "net-io": {"allowed_prefixes": ["virbr2"]}}}
+
+with start_monitoring(uri, monitor_config) as queue:
+    while True:
+        try:
+            (ip, port), data = queue.get(True, 1)
+            print (ip, port), data
+            # add_data(conn, ip, [data])
+        except Empty:
+            pass
diff --git a/wally/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/sensors/sensors/__init__.py
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
new file mode 100644
index 0000000..c9ff340
--- /dev/null
+++ b/wally/sensors/sensors/io_sensors.py
@@ -0,0 +1,72 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+    (3, 'reads_completed', True),
+    (5, 'sectors_read', True),
+    (6, 'rtime', True),
+    (7, 'writes_completed', True),
+    (9, 'sectors_written', True),
+    (10, 'wtime', True),
+    (11, 'io_queue', False),
+    (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+    results = {}
+    for line in open('/proc/diskstats'):
+        vals = line.split()
+        dev_name = vals[2]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
+
+
+def get_latency(stat1, stat2):
+    disks = set(i.split('.', 1)[0] for i in stat1)
+    results = {}
+
+    for disk in disks:
+        rdc = disk + '.reads_completed'
+        wrc = disk + '.writes_completed'
+        rdt = disk + '.rtime'
+        wrt = disk + '.wtime'
+        lat = 0.0
+
+        io_ops1 = stat1[rdc].value + stat1[wrc].value
+        io_ops2 = stat2[rdc].value + stat2[wrc].value
+
+        diops = io_ops2 - io_ops1
+
+        if diops != 0:
+            io1 = stat1[rdt].value + stat1[wrt].value
+            io2 = stat2[rdt].value + stat2[wrt].value
+            lat = abs(float(io1 - io2)) / diops
+
+        results[disk + '.latence'] = SensorInfo(lat, False)
+
+    return results
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
new file mode 100644
index 0000000..4a4e477
--- /dev/null
+++ b/wally/sensors/sensors/net_sensors.py
@@ -0,0 +1,43 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+    (0, 'recv_bytes', True),
+    (1, 'recv_packets', True),
+    (8, 'send_bytes', True),
+    (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker',), allowed_prefixes=None):
+    results = {}
+
+    for line in open('/proc/net/dev').readlines()[2:]:
+        dev_name, stats = line.split(":", 1)
+        dev_name = dev_name.strip()
+        vals = stats.split()
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+        if dev_ok:
+            for pos, name, accum_val in net_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
diff --git a/wally/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
new file mode 100644
index 0000000..cffdb71
--- /dev/null
+++ b/wally/sensors/sensors/pscpu_sensors.py
@@ -0,0 +1,37 @@
+import os
+
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+
+    for pid in pid_list:
+        try:
+            dev_name = get_pid_name(pid)
+
+            pid_stat1 = pid_stat(pid)
+
+            sensor_name = "{0}.{1}".format(dev_name, pid)
+            results[sensor_name] = SensorInfo(pid_stat1, True)
+        except IOError:
+            # may be, proc has already terminated, skip it
+            continue
+    return results
+
+
+def pid_stat(pid):
+    """ Return total cpu usage time from process"""
+    # read /proc/pid/stat
+    with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
+        proctimes = pidfile.readline().split()
+    # get utime from /proc/<pid>/stat, 14 item
+    utime = proctimes[13]
+    # get stime from proc/<pid>/stat, 15 item
+    stime = proctimes[14]
+    # count total process used time
+    proctotal = int(utime) + int(stime)
+    return float(proctotal)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
new file mode 100644
index 0000000..cbd85e6
--- /dev/null
+++ b/wally/sensors/sensors/psram_sensors.py
@@ -0,0 +1,76 @@
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author:  P@draigBrady.com
+# Source:  http://www.pixelbeat.org/scripts/ps_mem.py
+#   http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+    """ Return memory data of pid in format (private, shared) """
+
+    fname = '/proc/{0}/{1}'.format(pid, "smaps")
+    lines = open(fname).readlines()
+
+    shared = 0
+    private = 0
+    pss = 0
+
+    # add 0.5KiB as this avg error due to trunctation
+    pss_adjust = 0.5
+
+    for line in lines:
+        if line.startswith("Shared"):
+            shared += int(line.split()[1])
+
+        if line.startswith("Private"):
+            private += int(line.split()[1])
+
+        if line.startswith("Pss"):
+            pss += float(line.split()[1]) + pss_adjust
+
+    # Note Shared + Private = Rss above
+    # The Rss in smaps includes video card mem etc.
+
+    if pss != 0:
+        shared = int(pss - private)
+
+    return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+    print pid_list
+    for pid in pid_list:
+        try:
+            dev_name = get_pid_name(pid)
+
+            private, shared = get_mem_stats(pid)
+            total = private + shared
+            sys_total = get_ram_size()
+            usage = float(total) / float(sys_total)
+
+            sensor_name = "{0}.{1}".format(dev_name, pid)
+
+            results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+            results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+            results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+            name = sensor_name + ".mem_usage_percent"
+            results[name] = SensorInfo(usage * 100, False)
+        except IOError:
+            # permission denied or proc die
+            continue
+    return results
+
+
+def get_ram_size():
+    """ Return RAM size in Kb"""
+    with open("/proc/meminfo") as proc:
+        mem_total = proc.readline().split()
+    return mem_total[1]
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
new file mode 100644
index 0000000..d3da02b
--- /dev/null
+++ b/wally/sensors/sensors/syscpu_sensors.py
@@ -0,0 +1,40 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+# 0 - cpu name
+# 1 - user: normal processes executing in user mode
+# 2 - nice: niced processes executing in user mode
+# 3 - system: processes executing in kernel mode
+# 4 - idle: twiddling thumbs
+# 5 - iowait: waiting for I/O to complete
+# 6 - irq: servicing interrupts
+# 7 - softirq: servicing softirqs
+
+io_values_pos = [
+    (1, 'user_processes', True),
+    (2, 'nice_processes', True),
+    (3, 'system_processes', True),
+    (4, 'idle_time', True),
+]
+
+
+@provides("system-cpu")
+def syscpu_stat(disallowed_prefixes=('intr', 'ctxt', 'btime', 'processes',
+                                 'procs_running', 'procs_blocked', 'softirq'),
+            allowed_prefixes=None):
+    results = {}
+
+    for line in open('/proc/stat'):
+        vals = line.split()
+        dev_name = vals[0]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
+
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
new file mode 100644
index 0000000..c78eddd
--- /dev/null
+++ b/wally/sensors/sensors/sysram_sensors.py
@@ -0,0 +1,34 @@
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
+
+
+# return this values or setted in allowed
+ram_fields = [
+    'MemTotal',
+    'MemFree',
+    'Buffers',
+    'Cached',
+    'SwapCached',
+    'Dirty',
+    'Writeback',
+    'SwapTotal',
+    'SwapFree'
+]
+
+
+@provides("system-ram")
+def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    if allowed_prefixes is None:
+        allowed_prefixes = ram_fields
+    results = {}
+    for line in open('/proc/meminfo'):
+        vals = line.split()
+        dev_name = vals[0]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if dev_ok:
+            results[dev_name] = SensorInfo(int(vals[1]), False)
+    return results
diff --git a/wally/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
new file mode 100644
index 0000000..ad08676
--- /dev/null
+++ b/wally/sensors/sensors/utils.py
@@ -0,0 +1,83 @@
+import os
+
+from collections import namedtuple
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+    dev_ok = True
+
+    if disallowed_prefixes is not None:
+        dev_ok = all(not name.startswith(prefix)
+                     for prefix in disallowed_prefixes)
+
+    if dev_ok and allowed_prefixes is not None:
+        dev_ok = any(name.startswith(prefix)
+                     for prefix in allowed_prefixes)
+
+    return dev_ok
+
+
+def get_pid_list(disallowed_prefixes, allowed_prefixes):
+    """ Return pid list from list of pids and names """
+    # exceptions
+    but = disallowed_prefixes if disallowed_prefixes is not None else []
+    if allowed_prefixes is None:
+        # if nothing setted - all ps will be returned except setted
+        result = [pid
+                  for pid in os.listdir('/proc')
+                  if pid.isdigit() and pid not in but]
+    else:
+        result = []
+        for pid in os.listdir('/proc'):
+            if pid.isdigit() and pid not in but:
+                name = get_pid_name(pid)
+                if pid in allowed_prefixes or \
+                   any(name.startswith(val) for val in allowed_prefixes):
+                    print name
+                    # this is allowed pid?
+                    result.append(pid)
+    return result
+
+
+def get_pid_name(pid):
+    """ Return name by pid """
+    try:
+        with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
+            try:
+                cmd = pidfile.readline().split()[0]
+                return os.path.basename(cmd).rstrip('\x00')
+            except IndexError:
+                # no cmd returned
+                return "<NO NAME>"
+    except IOError:
+        # upstream wait any string, no matter if we couldn't read proc
+        return "no_such_process"
+
+
+def delta(func, only_upd=True):
+    prev = {}
+    while True:
+        for dev_name, vals in func():
+            if dev_name not in prev:
+                prev[dev_name] = {}
+                for name, (val, _) in vals.items():
+                    prev[dev_name][name] = val
+            else:
+                dev_prev = prev[dev_name]
+                res = {}
+                for stat_name, (val, accum_val) in vals.items():
+                    if accum_val:
+                        if stat_name in dev_prev:
+                            delta = int(val) - int(dev_prev[stat_name])
+                            if not only_upd or 0 != delta:
+                                res[stat_name] = str(delta)
+                        dev_prev[stat_name] = val
+                    elif not only_upd or '0' != val:
+                        res[stat_name] = val
+
+                if only_upd and len(res) == 0:
+                    continue
+                yield dev_name, res
+        yield None, None
diff --git a/wally/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
new file mode 100644
index 0000000..d7bf6aa
--- /dev/null
+++ b/wally/sensors/storage/__init__.py
@@ -0,0 +1,104 @@
+import struct
+
+
+def pack(val, tp=True):
+    if isinstance(val, int):
+        assert 0 <= val < 2 ** 16
+
+        if tp:
+            res = 'i'
+        else:
+            res = ""
+
+        res += struct.pack("!U", val)
+    elif isinstance(val, dict):
+        assert len(val) < 2 ** 16
+        if tp:
+            res = "d"
+        else:
+            res = ""
+
+        res += struct.pack("!U", len(val))
+        for k, v in dict.items():
+            assert 0 <= k < 2 ** 16
+            assert 0 <= v < 2 ** 32
+            res += struct.pack("!UI", k, v)
+    elif isinstance(val, str):
+        assert len(val) < 256
+        if tp:
+            res = "s"
+        else:
+            res = ""
+        res += chr(len(val)) + val
+    else:
+        raise ValueError()
+
+    return res
+
+
+def unpack(fd, tp=None):
+    if tp is None:
+        tp = fd.read(1)
+
+    if tp == 'i':
+        return struct.unpack("!U", fd.read(2))
+    elif tp == 'd':
+        res = {}
+        val_len = struct.unpack("!U", fd.read(2))
+        for _ in range(val_len):
+            k, v = struct.unpack("!UI", fd.read(6))
+            res[k] = v
+        return res
+    elif tp == 's':
+        val_len = struct.unpack("!U", fd.read(2))
+        return fd.read(val_len)
+
+    raise ValueError()
+
+
+class LocalStorage(object):
+    NEW_DATA = 0
+    NEW_SENSOR = 1
+    NEW_SOURCE = 2
+
+    def __init__(self, fd):
+        self.fd = fd
+        self.sensor_ids = {}
+        self.sources_ids = {}
+        self.max_source_id = 0
+        self.max_sensor_id = 0
+
+    def add_data(self, source, sensor_values):
+        source_id = self.sources_ids.get(source)
+        if source_id is None:
+            source_id = self.max_source_id
+            self.sources_ids[source] = source_id
+            self.emit(self.NEW_SOURCE, source_id, source)
+            self.max_source_id += 1
+
+        new_sensor_values = {}
+
+        for name, val in sensor_values.items():
+            sensor_id = self.sensor_ids.get(name)
+            if sensor_id is None:
+                sensor_id = self.max_sensor_id
+                self.sensor_ids[name] = sensor_id
+                self.emit(self.NEW_SENSOR, sensor_id, name)
+                self.max_sensor_id += 1
+            new_sensor_values[sensor_id] = val
+
+        self.emit(self.NEW_DATA, source_id, new_sensor_values)
+
+    def emit(self, tp, v1, v2):
+        self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
+
+    def readall(self):
+        tp = self.fd.read(1)
+        if ord(tp) == self.NEW_DATA:
+            pass
+        elif ord(tp) == self.NEW_SENSOR:
+            pass
+        elif ord(tp) == self.NEW_SOURCE:
+            pass
+        else:
+            raise ValueError()
diff --git a/wally/sensors/storage/grafana.py b/wally/sensors/storage/grafana.py
new file mode 100644
index 0000000..9823fac
--- /dev/null
+++ b/wally/sensors/storage/grafana.py
@@ -0,0 +1,47 @@
+import json
+
+
+query = """
+select value from "{series}"
+where $timeFilter and
+host='{host}' and device='{device}'
+order asc
+"""
+
+
+def make_dashboard_file(config):
+    series = ['writes_completed', 'sectors_written']
+    dashboards = []
+
+    for serie in series:
+        dashboard = dict(title=serie, type='graph',
+                         span=12, fill=1, linewidth=2,
+                         tooltip={'shared': True})
+
+        targets = []
+
+        for ip, devs in config.items():
+            for device in devs:
+                params = {
+                    'series': serie,
+                    'host': ip,
+                    'device': device
+                }
+
+                target = dict(
+                    target="disk io",
+                    query=query.replace("\n", " ").format(**params).strip(),
+                    interval="",
+                    alias="{0} io {1}".format(ip, device),
+                    rawQuery=True
+                )
+                targets.append(target)
+
+        dashboard['targets'] = targets
+        dashboards.append(dashboard)
+
+    fc = open("grafana_template.js").read()
+    return fc % (json.dumps(dashboards),)
+
+
+print make_dashboard_file({'192.168.0.104': ['sda1', 'rbd1']})
diff --git a/wally/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
new file mode 100644
index 0000000..7c57924
--- /dev/null
+++ b/wally/sensors/storage/grafana_template.js
@@ -0,0 +1,46 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+    from: "now-5m",
+    to: "now"
+};
+
+dashboard.rows.push({
+    title: 'Chart',
+    height: '300px',
+    panels: %s
+});
+
+
+return dashboard;
diff --git a/wally/sensors/storage/influx_exporter.py b/wally/sensors/storage/influx_exporter.py
new file mode 100644
index 0000000..34b3c0a
--- /dev/null
+++ b/wally/sensors/storage/influx_exporter.py
@@ -0,0 +1,31 @@
+from urlparse import urlparse
+from influxdb import InfluxDBClient
+
+
+def connect(url):
+    parsed_url = urlparse(url)
+    user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
+    user, passwd = user_passwd.split(":", 1)
+    host, port = host_port.split(":")
+    return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
+
+
+def add_data(conn, hostname, data):
+    per_sensor_data = {}
+    for serie in data:
+        serie = serie.copy()
+        gtime = serie.pop('time')
+        for key, val in serie.items():
+            dev, sensor = key.split('.')
+            data = per_sensor_data.setdefault(sensor, [])
+            data.append([gtime, hostname, dev, val])
+
+    infl_data = []
+    columns = ['time', 'host', 'device', 'value']
+    for sensor_name, points in per_sensor_data.items():
+        infl_data.append(
+            {'columns': columns,
+             'name': sensor_name,
+             'points': points})
+
+    conn.write_points(infl_data)
diff --git a/wally/sensors/storage/koder.js b/wally/sensors/storage/koder.js
new file mode 100644
index 0000000..a65a454
--- /dev/null
+++ b/wally/sensors/storage/koder.js
@@ -0,0 +1,47 @@
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (int ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+
+
+// accessable variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {rows : []};
+
+// Set a title
+dashboard.title = 'Tests dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameteres, but this is
+// handled automatically in grafana core during dashboard initialization
+dashboard.time = {
+    from: "now-5m",
+    to: "now"
+};
+
+dashboard.rows.push({
+    title: 'Chart',
+    height: '300px',
+    panels: [{"span": 12, "title": "writes_completed", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}, {"span": 12, "title": "sectors_written", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}]
+});
+
+
+return dashboard;
+
diff --git a/wally/sensors/umsgpack.py b/wally/sensors/umsgpack.py
new file mode 100644
index 0000000..0cdc83e
--- /dev/null
+++ b/wally/sensors/umsgpack.py
@@ -0,0 +1,879 @@
+# u-msgpack-python v2.0 - vsergeev at gmail
+# https://github.com/vsergeev/u-msgpack-python
+#
+# u-msgpack-python is a lightweight MessagePack serializer and deserializer
+# module, compatible with both Python 2 and 3, as well CPython and PyPy
+# implementations of Python. u-msgpack-python is fully compliant with the
+# latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+# particular, it supports the new binary, UTF-8 string, and application ext
+# types.
+#
+# MIT License
+#
+# Copyright (c) 2013-2014 Ivan A. Sergeev
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+"""
+u-msgpack-python v2.0 - vsergeev at gmail
+https://github.com/vsergeev/u-msgpack-python
+
+u-msgpack-python is a lightweight MessagePack serializer and deserializer
+module, compatible with both Python 2 and 3, as well CPython and PyPy
+implementations of Python. u-msgpack-python is fully compliant with the
+latest MessagePack specification.com/msgpack/msgpack/blob/master/spec.md). In
+particular, it supports the new binary, UTF-8 string, and application ext
+types.
+
+License: MIT
+"""
+
+__version__ = "2.0"
+"Module version string"
+
+version = (2,0)
+"Module version tuple"
+
+import struct
+import collections
+import sys
+import io
+
+################################################################################
+### Ext Class
+################################################################################
+
+# Extension type for application-defined types and data
+class Ext:
+    """
+    The Ext class facilitates creating a serializable extension object to store
+    an application-defined type and data byte array.
+    """
+
+    def __init__(self, type, data):
+        """
+        Construct a new Ext object.
+
+        Args:
+            type: application-defined type integer from 0 to 127
+            data: application-defined data byte array
+
+        Raises:
+            TypeError:
+                Specified ext type is outside of 0 to 127 range.
+
+        Example:
+        >>> foo = umsgpack.Ext(0x05, b"\x01\x02\x03")
+        >>> umsgpack.packb({u"special stuff": foo, u"awesome": True})
+        '\x82\xa7awesome\xc3\xadspecial stuff\xc7\x03\x05\x01\x02\x03'
+        >>> bar = umsgpack.unpackb(_)
+        >>> print(bar["special stuff"])
+        Ext Object (Type: 0x05, Data: 01 02 03)
+        >>>
+        """
+        # Application ext type should be 0 <= type <= 127
+        if not isinstance(type, int) or not (type >= 0 and type <= 127):
+            raise TypeError("ext type out of range")
+        # Check data is type bytes
+        elif sys.version_info[0] == 3 and not isinstance(data, bytes):
+            raise TypeError("ext data is not type \'bytes\'")
+        elif sys.version_info[0] == 2 and not isinstance(data, str):
+            raise TypeError("ext data is not type \'str\'")
+        self.type = type
+        self.data = data
+
+    def __eq__(self, other):
+        """
+        Compare this Ext object with another for equality.
+        """
+        return (isinstance(other, self.__class__) and
+                self.type == other.type and
+                self.data == other.data)
+
+    def __ne__(self, other):
+        """
+        Compare this Ext object with another for inequality.
+        """
+        return not self.__eq__(other)
+
+    def __str__(self):
+        """
+        String representation of this Ext object.
+        """
+        s = "Ext Object (Type: 0x%02x, Data: " % self.type
+        for i in range(min(len(self.data), 8)):
+            if i > 0:
+                s += " "
+            if isinstance(self.data[i], int):
+                s += "%02x" % (self.data[i])
+            else:
+                s += "%02x" % ord(self.data[i])
+        if len(self.data) > 8:
+            s += " ..."
+        s += ")"
+        return s
+
+################################################################################
+### Exceptions
+################################################################################
+
+# Base Exception classes
+class PackException(Exception):
+    "Base class for exceptions encountered during packing."
+    pass
+class UnpackException(Exception):
+    "Base class for exceptions encountered during unpacking."
+    pass
+
+# Packing error
+class UnsupportedTypeException(PackException):
+    "Object type not supported for packing."
+    pass
+
+# Unpacking error
+class InsufficientDataException(UnpackException):
+    "Insufficient data to unpack the encoded object."
+    pass
+class InvalidStringException(UnpackException):
+    "Invalid UTF-8 string encountered during unpacking."
+    pass
+class ReservedCodeException(UnpackException):
+    "Reserved code encountered during unpacking."
+    pass
+class UnhashableKeyException(UnpackException):
+    """
+    Unhashable key encountered during map unpacking.
+    The serialized map cannot be deserialized into a Python dictionary.
+    """
+    pass
+class DuplicateKeyException(UnpackException):
+    "Duplicate key encountered during map unpacking."
+    pass
+
+# Backwards compatibility
+KeyNotPrimitiveException = UnhashableKeyException
+KeyDuplicateException = DuplicateKeyException
+
+################################################################################
+### Exported Functions and Globals
+################################################################################
+
+# Exported functions and variables, set up in __init()
+pack = None
+packb = None
+unpack = None
+unpackb = None
+dump = None
+dumps = None
+load = None
+loads = None
+
+compatibility = False
+"""
+Compatibility mode boolean.
+
+When compatibility mode is enabled, u-msgpack-python will serialize both
+unicode strings and bytes into the old "raw" msgpack type, and deserialize the
+"raw" msgpack type into bytes. This provides backwards compatibility with the
+old MessagePack specification.
+
+Example:
+>>> umsgpack.compatibility = True
+>>>
+>>> umsgpack.packb([u"some string", b"some bytes"])
+b'\x92\xabsome string\xaasome bytes'
+>>> umsgpack.unpackb(_)
+[b'some string', b'some bytes']
+>>>
+"""
+
+################################################################################
+### Packing
+################################################################################
+
+# You may notice struct.pack("B", obj) instead of the simpler chr(obj) in the
+# code below. This is to allow for seamless Python 2 and 3 compatibility, as
+# chr(obj) has a str return type instead of bytes in Python 3, and
+# struct.pack(...) has the right return type in both versions.
+
+def _pack_integer(obj, fp):
+    if obj < 0:
+        if obj >= -32:
+            fp.write(struct.pack("b", obj))
+        elif obj >= -2**(8-1):
+            fp.write(b"\xd0" + struct.pack("b", obj))
+        elif obj >= -2**(16-1):
+            fp.write(b"\xd1" + struct.pack(">h", obj))
+        elif obj >= -2**(32-1):
+            fp.write(b"\xd2" + struct.pack(">i", obj))
+        elif obj >= -2**(64-1):
+            fp.write(b"\xd3" + struct.pack(">q", obj))
+        else:
+            raise UnsupportedTypeException("huge signed int")
+    else:
+        if obj <= 127:
+            fp.write(struct.pack("B", obj))
+        elif obj <= 2**8-1:
+            fp.write(b"\xcc" + struct.pack("B", obj))
+        elif obj <= 2**16-1:
+            fp.write(b"\xcd" + struct.pack(">H", obj))
+        elif obj <= 2**32-1:
+            fp.write(b"\xce" + struct.pack(">I", obj))
+        elif obj <= 2**64-1:
+            fp.write(b"\xcf" + struct.pack(">Q", obj))
+        else:
+            raise UnsupportedTypeException("huge unsigned int")
+
+def _pack_nil(obj, fp):
+    fp.write(b"\xc0")
+
+def _pack_boolean(obj, fp):
+    fp.write(b"\xc3" if obj else b"\xc2")
+
+def _pack_float(obj, fp):
+    if _float_size == 64:
+        fp.write(b"\xcb" + struct.pack(">d", obj))
+    else:
+        fp.write(b"\xca" + struct.pack(">f", obj))
+
+def _pack_string(obj, fp):
+    obj = obj.encode('utf-8')
+    if len(obj) <= 31:
+        fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+    elif len(obj) <= 2**8-1:
+        fp.write(b"\xd9" + struct.pack("B", len(obj)) + obj)
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+    else:
+        raise UnsupportedTypeException("huge string")
+
+def _pack_binary(obj, fp):
+    if len(obj) <= 2**8-1:
+        fp.write(b"\xc4" + struct.pack("B", len(obj)) + obj)
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xc5" + struct.pack(">H", len(obj)) + obj)
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xc6" + struct.pack(">I", len(obj)) + obj)
+    else:
+        raise UnsupportedTypeException("huge binary string")
+
+def _pack_oldspec_raw(obj, fp):
+    if len(obj) <= 31:
+        fp.write(struct.pack("B", 0xa0 | len(obj)) + obj)
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xda" + struct.pack(">H", len(obj)) + obj)
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdb" + struct.pack(">I", len(obj)) + obj)
+    else:
+        raise UnsupportedTypeException("huge raw string")
+
+def _pack_ext(obj, fp):
+    if len(obj.data) == 1:
+        fp.write(b"\xd4" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 2:
+        fp.write(b"\xd5" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 4:
+        fp.write(b"\xd6" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 8:
+        fp.write(b"\xd7" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) == 16:
+        fp.write(b"\xd8" + struct.pack("B", obj.type & 0xff) + obj.data)
+    elif len(obj.data) <= 2**8-1:
+        fp.write(b"\xc7" + struct.pack("BB", len(obj.data), obj.type & 0xff) + obj.data)
+    elif len(obj.data) <= 2**16-1:
+        fp.write(b"\xc8" + struct.pack(">HB", len(obj.data), obj.type & 0xff) + obj.data)
+    elif len(obj.data) <= 2**32-1:
+        fp.write(b"\xc9" + struct.pack(">IB", len(obj.data), obj.type & 0xff) + obj.data)
+    else:
+        raise UnsupportedTypeException("huge ext data")
+
+def _pack_array(obj, fp):
+    if len(obj) <= 15:
+        fp.write(struct.pack("B", 0x90 | len(obj)))
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xdc" + struct.pack(">H", len(obj)))
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdd" + struct.pack(">I", len(obj)))
+    else:
+        raise UnsupportedTypeException("huge array")
+
+    for e in obj:
+        pack(e, fp)
+
+def _pack_map(obj, fp):
+    if len(obj) <= 15:
+        fp.write(struct.pack("B", 0x80 | len(obj)))
+    elif len(obj) <= 2**16-1:
+        fp.write(b"\xde" + struct.pack(">H", len(obj)))
+    elif len(obj) <= 2**32-1:
+        fp.write(b"\xdf" + struct.pack(">I", len(obj)))
+    else:
+        raise UnsupportedTypeException("huge array")
+
+    for k,v in obj.items():
+        pack(k, fp)
+        pack(v, fp)
+
+########################################
+
+# Pack for Python 2, with 'unicode' type, 'str' type, and 'long' type
+def _pack2(obj, fp):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+        fp: a .write()-supporting file-like object
+
+    Returns:
+        None.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> f = open('test.bin', 'w')
+    >>> umsgpack.pack({u"compact": True, u"schema": 0}, f)
+    >>>
+    """
+
+    global compatibility
+
+    if obj is None:
+        _pack_nil(obj, fp)
+    elif isinstance(obj, bool):
+        _pack_boolean(obj, fp)
+    elif isinstance(obj, int) or isinstance(obj, long):
+        _pack_integer(obj, fp)
+    elif isinstance(obj, float):
+        _pack_float(obj, fp)
+    elif compatibility and isinstance(obj, unicode):
+        _pack_oldspec_raw(bytes(obj), fp)
+    elif compatibility and isinstance(obj, bytes):
+        _pack_oldspec_raw(obj, fp)
+    elif isinstance(obj, unicode):
+        _pack_string(obj, fp)
+    elif isinstance(obj, str):
+        _pack_binary(obj, fp)
+    elif isinstance(obj, list) or isinstance(obj, tuple):
+        _pack_array(obj, fp)
+    elif isinstance(obj, dict):
+        _pack_map(obj, fp)
+    elif isinstance(obj, Ext):
+        _pack_ext(obj, fp)
+    else:
+        raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+# Pack for Python 3, with unicode 'str' type, 'bytes' type, and no 'long' type
+def _pack3(obj, fp):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+        fp: a .write()-supporting file-like object
+
+    Returns:
+        None.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> f = open('test.bin', 'w')
+    >>> umsgpack.pack({u"compact": True, u"schema": 0}, fp)
+    >>>
+    """
+    global compatibility
+
+    if obj is None:
+        _pack_nil(obj, fp)
+    elif isinstance(obj, bool):
+        _pack_boolean(obj, fp)
+    elif isinstance(obj, int):
+        _pack_integer(obj, fp)
+    elif isinstance(obj, float):
+        _pack_float(obj, fp)
+    elif compatibility and isinstance(obj, str):
+        _pack_oldspec_raw(obj.encode('utf-8'), fp)
+    elif compatibility and isinstance(obj, bytes):
+        _pack_oldspec_raw(obj, fp)
+    elif isinstance(obj, str):
+        _pack_string(obj, fp)
+    elif isinstance(obj, bytes):
+        _pack_binary(obj, fp)
+    elif isinstance(obj, list) or isinstance(obj, tuple):
+        _pack_array(obj, fp)
+    elif isinstance(obj, dict):
+        _pack_map(obj, fp)
+    elif isinstance(obj, Ext):
+        _pack_ext(obj, fp)
+    else:
+        raise UnsupportedTypeException("unsupported type: %s" % str(type(obj)))
+
+def _packb2(obj):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+
+    Returns:
+        A 'str' containing serialized MessagePack bytes.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> umsgpack.packb({u"compact": True, u"schema": 0})
+    '\x82\xa7compact\xc3\xa6schema\x00'
+    >>>
+    """
+    fp = io.BytesIO()
+    _pack2(obj, fp)
+    return fp.getvalue()
+
+def _packb3(obj):
+    """
+    Serialize a Python object into MessagePack bytes.
+
+    Args:
+        obj: a Python object
+
+    Returns:
+        A 'bytes' containing serialized MessagePack bytes.
+
+    Raises:
+        UnsupportedType(PackException):
+            Object type not supported for packing.
+
+    Example:
+    >>> umsgpack.packb({u"compact": True, u"schema": 0})
+    b'\x82\xa7compact\xc3\xa6schema\x00'
+    >>>
+    """
+    fp = io.BytesIO()
+    _pack3(obj, fp)
+    return fp.getvalue()
+
+################################################################################
+### Unpacking
+################################################################################
+
+def _read_except(fp, n):
+    data = fp.read(n)
+    if len(data) < n:
+        raise InsufficientDataException()
+    return data
+
+def _unpack_integer(code, fp):
+    if (ord(code) & 0xe0) == 0xe0:
+        return struct.unpack("b", code)[0]
+    elif code == b'\xd0':
+        return struct.unpack("b", _read_except(fp, 1))[0]
+    elif code == b'\xd1':
+        return struct.unpack(">h", _read_except(fp, 2))[0]
+    elif code == b'\xd2':
+        return struct.unpack(">i", _read_except(fp, 4))[0]
+    elif code == b'\xd3':
+        return struct.unpack(">q", _read_except(fp, 8))[0]
+    elif (ord(code) & 0x80) == 0x00:
+        return struct.unpack("B", code)[0]
+    elif code == b'\xcc':
+        return struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xcd':
+        return struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xce':
+        return struct.unpack(">I", _read_except(fp, 4))[0]
+    elif code == b'\xcf':
+        return struct.unpack(">Q", _read_except(fp, 8))[0]
+    raise Exception("logic error, not int: 0x%02x" % ord(code))
+
+def _unpack_reserved(code, fp):
+    if code == b'\xc1':
+        raise ReservedCodeException("encountered reserved code: 0x%02x" % ord(code))
+    raise Exception("logic error, not reserved code: 0x%02x" % ord(code))
+
+def _unpack_nil(code, fp):
+    if code == b'\xc0':
+        return None
+    raise Exception("logic error, not nil: 0x%02x" % ord(code))
+
+def _unpack_boolean(code, fp):
+    if code == b'\xc2':
+        return False
+    elif code == b'\xc3':
+        return True
+    raise Exception("logic error, not boolean: 0x%02x" % ord(code))
+
+def _unpack_float(code, fp):
+    if code == b'\xca':
+        return struct.unpack(">f", _read_except(fp, 4))[0]
+    elif code == b'\xcb':
+        return struct.unpack(">d", _read_except(fp, 8))[0]
+    raise Exception("logic error, not float: 0x%02x" % ord(code))
+
+def _unpack_string(code, fp):
+    if (ord(code) & 0xe0) == 0xa0:
+        length = ord(code) & ~0xe0
+    elif code == b'\xd9':
+        length = struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xda':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xdb':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not string: 0x%02x" % ord(code))
+
+    # Always return raw bytes in compatibility mode
+    global compatibility
+    if compatibility:
+        return _read_except(fp, length)
+
+    try:
+        return bytes.decode(_read_except(fp, length), 'utf-8')
+    except UnicodeDecodeError:
+        raise InvalidStringException("unpacked string is not utf-8")
+
+def _unpack_binary(code, fp):
+    if code == b'\xc4':
+        length = struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xc5':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xc6':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not binary: 0x%02x" % ord(code))
+
+    return _read_except(fp, length)
+
+def _unpack_ext(code, fp):
+    if code == b'\xd4':
+        length = 1
+    elif code == b'\xd5':
+        length = 2
+    elif code == b'\xd6':
+        length = 4
+    elif code == b'\xd7':
+        length = 8
+    elif code == b'\xd8':
+        length = 16
+    elif code == b'\xc7':
+        length = struct.unpack("B", _read_except(fp, 1))[0]
+    elif code == b'\xc8':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xc9':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not ext: 0x%02x" % ord(code))
+
+    return Ext(ord(_read_except(fp, 1)), _read_except(fp, length))
+
+def _unpack_array(code, fp):
+    if (ord(code) & 0xf0) == 0x90:
+        length = (ord(code) & ~0xf0)
+    elif code == b'\xdc':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xdd':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not array: 0x%02x" % ord(code))
+
+    return [_unpack(fp) for i in range(length)]
+
+def _deep_list_to_tuple(obj):
+    if isinstance(obj, list):
+        return tuple([_deep_list_to_tuple(e) for e in obj])
+    return obj
+
+def _unpack_map(code, fp):
+    if (ord(code) & 0xf0) == 0x80:
+        length = (ord(code) & ~0xf0)
+    elif code == b'\xde':
+        length = struct.unpack(">H", _read_except(fp, 2))[0]
+    elif code == b'\xdf':
+        length = struct.unpack(">I", _read_except(fp, 4))[0]
+    else:
+        raise Exception("logic error, not map: 0x%02x" % ord(code))
+
+    d = {}
+    for i in range(length):
+        # Unpack key
+        k = _unpack(fp)
+
+        if isinstance(k, list):
+            # Attempt to convert list into a hashable tuple
+            k = _deep_list_to_tuple(k)
+        elif not isinstance(k, collections.Hashable):
+            raise UnhashableKeyException("encountered unhashable key: %s, %s" % (str(k), str(type(k))))
+        elif k in d:
+            raise DuplicateKeyException("encountered duplicate key: %s, %s" % (str(k), str(type(k))))
+
+        # Unpack value
+        v = _unpack(fp)
+
+        try:
+            d[k] = v
+        except TypeError:
+            raise UnhashableKeyException("encountered unhashable key: %s" % str(k))
+    return d
+
+def _unpack(fp):
+    code = _read_except(fp, 1)
+    return _unpack_dispatch_table[code](code, fp)
+
+########################################
+
+def _unpack2(fp):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        fp: a .read()-supporting file-like object
+
+    Returns:
+        A Python object.
+
+    Raises:
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> f = open("test.bin")
+    >>> umsgpack.unpackb(f)
+    {u'compact': True, u'schema': 0}
+    >>>
+    """
+    return _unpack(fp)
+
+def _unpack3(fp):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        fp: a .read()-supporting file-like object
+
+    Returns:
+        A Python object.
+
+    Raises:
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> f = open("test.bin")
+    >>> umsgpack.unpackb(f)
+    {'compact': True, 'schema': 0}
+    >>>
+    """
+    return _unpack(fp)
+
+# For Python 2, expects a str object
+def _unpackb2(s):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        s: a 'str' containing serialized MessagePack bytes
+
+    Returns:
+        A Python object.
+
+    Raises:
+        TypeError:
+            Packed data is not type 'str'.
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+    {u'compact': True, u'schema': 0}
+    >>>
+    """
+    if not isinstance(s, str):
+        raise TypeError("packed data is not type 'str'")
+    return _unpack(io.BytesIO(s))
+
+# For Python 3, expects a bytes object
+def _unpackb3(s):
+    """
+    Deserialize MessagePack bytes into a Python object.
+
+    Args:
+        s: a 'bytes' containing serialized MessagePack bytes
+
+    Returns:
+        A Python object.
+
+    Raises:
+        TypeError:
+            Packed data is not type 'bytes'.
+        InsufficientDataException(UnpackException):
+            Insufficient data to unpack the encoded object.
+        InvalidStringException(UnpackException):
+            Invalid UTF-8 string encountered during unpacking.
+        ReservedCodeException(UnpackException):
+            Reserved code encountered during unpacking.
+        UnhashableKeyException(UnpackException):
+            Unhashable key encountered during map unpacking.
+            The serialized map cannot be deserialized into a Python dictionary.
+        DuplicateKeyException(UnpackException):
+            Duplicate key encountered during map unpacking.
+
+    Example:
+    >>> umsgpack.unpackb(b'\x82\xa7compact\xc3\xa6schema\x00')
+    {'compact': True, 'schema': 0}
+    >>>
+    """
+    if not isinstance(s, bytes):
+        raise TypeError("packed data is not type 'bytes'")
+    return _unpack(io.BytesIO(s))
+
+################################################################################
+### Module Initialization
+################################################################################
+
+def __init():
+    global pack
+    global packb
+    global unpack
+    global unpackb
+    global dump
+    global dumps
+    global load
+    global loads
+    global compatibility
+    global _float_size
+    global _unpack_dispatch_table
+
+    # Compatibility mode for handling strings/bytes with the old specification
+    compatibility = False
+
+    # Auto-detect system float precision
+    if sys.float_info.mant_dig == 53:
+        _float_size = 64
+    else:
+        _float_size = 32
+
+    # Map packb and unpackb to the appropriate version
+    if sys.version_info[0] == 3:
+        pack = _pack3
+        packb = _packb3
+        dump = _pack3
+        dumps = _packb3
+        unpack = _unpack3
+        unpackb = _unpackb3
+        load = _unpack3
+        loads = _unpackb3
+    else:
+        pack = _pack2
+        packb = _packb2
+        dump = _pack2
+        dumps = _packb2
+        unpack = _unpack2
+        unpackb = _unpackb2
+        load = _unpack2
+        loads = _unpackb2
+
+    # Build a dispatch table for fast lookup of unpacking function
+
+    _unpack_dispatch_table = {}
+    # Fix uint
+    for code in range(0, 0x7f+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+    # Fix map
+    for code in range(0x80, 0x8f+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_map
+    # Fix array
+    for code in range(0x90, 0x9f+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_array
+    # Fix str
+    for code in range(0xa0, 0xbf+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+    # Nil
+    _unpack_dispatch_table[b'\xc0'] = _unpack_nil
+    # Reserved
+    _unpack_dispatch_table[b'\xc1'] = _unpack_reserved
+    # Boolean
+    _unpack_dispatch_table[b'\xc2'] = _unpack_boolean
+    _unpack_dispatch_table[b'\xc3'] = _unpack_boolean
+    # Bin
+    for code in range(0xc4, 0xc6+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_binary
+    # Ext
+    for code in range(0xc7, 0xc9+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+    # Float
+    _unpack_dispatch_table[b'\xca'] = _unpack_float
+    _unpack_dispatch_table[b'\xcb'] = _unpack_float
+    # Uint
+    for code in range(0xcc, 0xcf+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+    # Int
+    for code in range(0xd0, 0xd3+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+    # Fixext
+    for code in range(0xd4, 0xd8+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_ext
+    # String
+    for code in range(0xd9, 0xdb+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_string
+    # Array
+    _unpack_dispatch_table[b'\xdc'] = _unpack_array
+    _unpack_dispatch_table[b'\xdd'] = _unpack_array
+    # Map
+    _unpack_dispatch_table[b'\xde'] = _unpack_map
+    _unpack_dispatch_table[b'\xdf'] = _unpack_map
+    # Negative fixint
+    for code in range(0xe0, 0xff+1):
+        _unpack_dispatch_table[struct.pack("B", code)] = _unpack_integer
+
+__init()
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
new file mode 100644
index 0000000..68d4017
--- /dev/null
+++ b/wally/ssh_utils.py
@@ -0,0 +1,306 @@
+import re
+import time
+import socket
+import logging
+import os.path
+import getpass
+import threading
+
+
+import paramiko
+
+
+logger = logging.getLogger("wally")
+
+
+def ssh_connect(creds, retry_count=6, timeout=10):
+    ssh = paramiko.SSHClient()
+    ssh.load_host_keys('/dev/null')
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    ssh.known_hosts = None
+    for i in range(retry_count):
+        try:
+            if creds.user is None:
+                user = getpass.getuser()
+            else:
+                user = creds.user
+
+            if creds.passwd is not None:
+                ssh.connect(creds.host,
+                            timeout=timeout,  # tcp connect timeout
+                            username=user,
+                            password=creds.passwd,
+                            port=creds.port,
+                            allow_agent=False,
+                            look_for_keys=False)
+                return ssh
+
+            if creds.key_file is not None:
+                ssh.connect(creds.host,
+                            username=user,
+                            timeout=timeout,  # tcp connect timeout
+                            key_filename=creds.key_file,
+                            look_for_keys=False,
+                            port=creds.port)
+                return ssh
+
+            key_file = os.path.expanduser('~/.ssh/id_rsa')
+            ssh.connect(creds.host,
+                        username=user,
+                        timeout=timeout,  # tcp connect timeout
+                        key_filename=key_file,
+                        look_for_keys=False,
+                        port=creds.port)
+            return ssh
+            # raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+        except paramiko.PasswordRequiredException:
+            raise
+        except socket.error as err:
+            print err
+            if i == retry_count - 1:
+                raise
+            time.sleep(1)
+
+
+def normalize_dirpath(dirpath):
+    while dirpath.endswith("/"):
+        dirpath = dirpath[:-1]
+    return dirpath
+
+
+ALL_RWX_MODE = ((1 << 9) - 1)
+
+
+def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
+    remotepath = normalize_dirpath(remotepath)
+    if intermediate:
+        try:
+            sftp.mkdir(remotepath, mode=mode)
+        except IOError:
+            ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
+                      intermediate=True)
+            return sftp.mkdir(remotepath, mode=mode)
+    else:
+        sftp.mkdir(remotepath, mode=mode)
+
+
+def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
+    sftp.put(localfile, remfile)
+    if preserve_perm:
+        sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
+
+
+def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
+    "upload local directory to remote recursively"
+
+    # hack for localhost connection
+    if hasattr(sftp, "copytree"):
+        sftp.copytree(localpath, remotepath)
+        return
+
+    assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
+
+    # normalize
+    localpath = normalize_dirpath(localpath)
+    remotepath = normalize_dirpath(remotepath)
+
+    try:
+        sftp.chdir(remotepath)
+        localsuffix = localpath.rsplit("/", 1)[1]
+        remotesuffix = remotepath.rsplit("/", 1)[1]
+        if localsuffix != remotesuffix:
+            remotepath = os.path.join(remotepath, localsuffix)
+    except IOError:
+        pass
+
+    for root, dirs, fls in os.walk(localpath):
+        prefix = os.path.commonprefix([localpath, root])
+        suffix = root.split(prefix, 1)[1]
+        if suffix.startswith("/"):
+            suffix = suffix[1:]
+
+        remroot = os.path.join(remotepath, suffix)
+
+        try:
+            sftp.chdir(remroot)
+        except IOError:
+            if preserve_perm:
+                mode = os.stat(root).st_mode & ALL_RWX_MODE
+            else:
+                mode = ALL_RWX_MODE
+            ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
+            sftp.chdir(remroot)
+
+        for f in fls:
+            remfile = os.path.join(remroot, f)
+            localfile = os.path.join(root, f)
+            ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def delete_file(conn, path):
+    sftp = conn.open_sftp()
+    sftp.remove(path)
+    sftp.close()
+
+
+def copy_paths(conn, paths):
+    sftp = conn.open_sftp()
+    try:
+        for src, dst in paths.items():
+            try:
+                if os.path.isfile(src):
+                    ssh_copy_file(sftp, src, dst)
+                elif os.path.isdir(src):
+                    put_dir_recursively(sftp, src, dst)
+                else:
+                    templ = "Can't copy {0!r} - " + \
+                            "it neither a file not a directory"
+                    msg = templ.format(src)
+                    raise OSError(msg)
+            except Exception as exc:
+                tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+                msg = tmpl.format(src, dst, exc)
+                raise OSError(msg)
+    finally:
+        sftp.close()
+
+
+class ConnCreds(object):
+    conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+    def __init__(self):
+        for name in self.conn_uri_attrs:
+            setattr(self, name, None)
+
+    def __str__(self):
+        return str(self.__dict__)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+    class ReParts(object):
+        user_rr = "[^:]*?"
+        host_rr = "[^:]*?"
+        port_rr = "\\d+"
+        key_file_rr = "[^:@]*"
+        passwd_rr = ".*?"
+
+    re_dct = ReParts.__dict__
+
+    for attr_name, val in re_dct.items():
+        if attr_name.endswith('_rr'):
+            new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+            setattr(ReParts, attr_name, new_rr)
+
+    re_dct = ReParts.__dict__
+
+    templs = [
+        "^{host_rr}$",
+        "^{user_rr}@{host_rr}::{key_file_rr}$",
+        "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+        "^{user_rr}:{passwd_rr}@@{host_rr}$",
+        "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
+    ]
+
+    for templ in templs:
+        uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+    # user:passwd@@ip_host:port
+    # user:passwd@@ip_host
+    # user@ip_host:port
+    # user@ip_host
+    # ip_host:port
+    # ip_host
+    # user@ip_host:port:path_to_key_file
+    # user@ip_host::path_to_key_file
+    # ip_host:port:path_to_key_file
+    # ip_host::path_to_key_file
+
+    res = ConnCreds()
+    res.port = "22"
+    res.key_file = None
+    res.passwd = None
+
+    for rr in uri_reg_exprs:
+        rrm = re.match(rr, uri)
+        if rrm is not None:
+            res.__dict__.update(rrm.groupdict())
+            return res
+
+    raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def connect(uri):
+    creds = parse_ssh_uri(uri)
+    creds.port = int(creds.port)
+    return ssh_connect(creds)
+
+
+all_sessions_lock = threading.Lock()
+all_sessions = []
+
+
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
+                 nolog=False, node=None):
+    "should be replaces by normal implementation, with select"
+    transport = conn.get_transport()
+    session = transport.open_session()
+
+    if node is None:
+        node = ""
+
+    with all_sessions_lock:
+        all_sessions.append(session)
+
+    try:
+        session.set_combine_stderr(True)
+
+        stime = time.time()
+
+        if not nolog:
+            logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
+
+        session.exec_command(cmd)
+
+        if stdin_data is not None:
+            session.sendall(stdin_data)
+
+        session.settimeout(1)
+        session.shutdown_write()
+        output = ""
+
+        while True:
+            try:
+                ndata = session.recv(1024)
+                output += ndata
+                if "" == ndata:
+                    break
+            except socket.timeout:
+                pass
+
+            if time.time() - stime > timeout:
+                raise OSError(output + "\nExecution timeout")
+
+        code = session.recv_exit_status()
+    finally:
+        session.close()
+
+    if code != 0:
+        templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+        raise OSError(templ.format(node, cmd, code, output))
+
+    return output
+
+
+def close_all_sessions():
+    with all_sessions_lock:
+        for session in all_sessions:
+            try:
+                session.sendall('\x03')
+                session.close()
+            except:
+                pass
diff --git a/wally/start_vms.py b/wally/start_vms.py
new file mode 100644
index 0000000..4fd35ae
--- /dev/null
+++ b/wally/start_vms.py
@@ -0,0 +1,372 @@
+import re
+import os
+import time
+import os.path
+import logging
+import subprocess
+
+from concurrent.futures import ThreadPoolExecutor
+
+from novaclient.exceptions import NotFound
+from novaclient.client import Client as n_client
+from cinderclient.v1.client import Client as c_client
+
+import wally
+from wally.discover import Node
+
+
+logger = logging.getLogger("wally.vms")
+
+
+def ostack_get_creds():
+    env = os.environ.get
+    name = env('OS_USERNAME')
+    passwd = env('OS_PASSWORD')
+    tenant = env('OS_TENANT_NAME')
+    auth_url = env('OS_AUTH_URL')
+
+    return name, passwd, tenant, auth_url
+
+
+NOVA_CONNECTION = None
+
+
+def nova_connect(name=None, passwd=None, tenant=None, auth_url=None):
+    global NOVA_CONNECTION
+    if NOVA_CONNECTION is None:
+        if name is None:
+            name, passwd, tenant, auth_url = ostack_get_creds()
+        NOVA_CONNECTION = n_client('1.1', name, passwd, tenant, auth_url)
+    return NOVA_CONNECTION
+
+
+def nova_disconnect():
+    global NOVA_CONNECTION
+    if NOVA_CONNECTION is not None:
+        NOVA_CONNECTION.close()
+        NOVA_CONNECTION = None
+
+
+def prepare_os_subpr(name=None, passwd=None, tenant=None, auth_url=None):
+    if name is None:
+        name, passwd, tenant, auth_url = ostack_get_creds()
+
+    params = {
+        'OS_USERNAME': name,
+        'OS_PASSWORD':  passwd,
+        'OS_TENANT_NAME':  tenant,
+        'OS_AUTH_URL':  auth_url
+    }
+
+    params_s = " ".join("{}={}".format(k, v) for k, v in params.items())
+
+    spath = os.path.dirname(wally.__file__)
+    spath = os.path.dirname(spath)
+    spath = os.path.join(spath, 'scripts/prepare.sh')
+
+    cmd_templ = "env {params} bash {spath} >/dev/null"
+    cmd = cmd_templ.format(params=params_s, spath=spath)
+    subprocess.call(cmd, shell=True)
+
+
+def prepare_os(nova, params):
+    allow_ssh(nova, params['security_group'])
+
+    shed_ids = []
+    for shed_group in params['schedulers_groups']:
+        shed_ids.append(get_or_create_aa_group(nova, shed_group))
+
+    create_keypair(nova,
+                   params['keypair_name'],
+                   params['pub_key_path'],
+                   params['priv_key_path'])
+
+    create_image(nova, params['image']['name'],
+                 params['image']['url'])
+
+    create_flavor(nova, **params['flavor'])
+
+
+def get_or_create_aa_group(nova, name):
+    try:
+        group = nova.server_groups.find(name=name)
+    except NotFound:
+        group = nova.server_groups.create({'name': name,
+                                           'policies': ['anti-affinity']})
+
+    return group.id
+
+
+def allow_ssh(nova, group_name):
+    try:
+        secgroup = nova.security_groups.find(name=group_name)
+    except NotFound:
+        secgroup = nova.security_groups.create(group_name,
+                                               "allow ssh/ping to node")
+
+    nova.security_group_rules.create(secgroup.id,
+                                     ip_protocol="tcp",
+                                     from_port="22",
+                                     to_port="22",
+                                     cidr="0.0.0.0/0")
+
+    nova.security_group_rules.create(secgroup.id,
+                                     ip_protocol="icmp",
+                                     from_port=-1,
+                                     cidr="0.0.0.0/0",
+                                     to_port=-1)
+    return secgroup.id
+
+
+def create_image(nova, name, url):
+    pass
+
+
+def create_flavor(nova, name, **params):
+    pass
+
+
+def create_keypair(nova, name, pub_key_path, priv_key_path):
+    try:
+        nova.keypairs.find(name=name)
+    except NotFound:
+        if os.path.exists(pub_key_path):
+            with open(pub_key_path) as pub_key_fd:
+                return nova.keypairs.create(name, pub_key_fd.read())
+        else:
+            key = nova.keypairs.create(name)
+
+            with open(priv_key_path, "w") as priv_key_fd:
+                priv_key_fd.write(key.private_key)
+
+            with open(pub_key_path, "w") as pub_key_fd:
+                pub_key_fd.write(key.public_key)
+
+
+def create_volume(size, name):
+    cinder = c_client(*ostack_get_creds())
+    vol = cinder.volumes.create(size=size, display_name=name)
+    err_count = 0
+
+    while vol['status'] != 'available':
+        if vol['status'] == 'error':
+            if err_count == 3:
+                logger.critical("Fail to create volume")
+                raise RuntimeError("Fail to create volume")
+            else:
+                err_count += 1
+                cinder.volumes.delete(vol)
+                time.sleep(1)
+                vol = cinder.volumes.create(size=size, display_name=name)
+                continue
+        time.sleep(1)
+        vol = cinder.volumes.get(vol['id'])
+    return vol
+
+
+def wait_for_server_active(nova, server, timeout=240):
+    t = time.time()
+    while True:
+        time.sleep(1)
+        sstate = getattr(server, 'OS-EXT-STS:vm_state').lower()
+
+        if sstate == 'active':
+            return True
+
+        if sstate == 'error':
+            return False
+
+        if time.time() - t > timeout:
+            return False
+
+        server = nova.servers.get(server)
+
+
+class Allocate(object):
+    pass
+
+
+def get_floating_ips(nova, pool, amount):
+    ip_list = nova.floating_ips.list()
+
+    if pool is not None:
+        ip_list = [ip for ip in ip_list if ip.pool == pool]
+
+    return [ip for ip in ip_list if ip.instance_id is None][:amount]
+
+
+def launch_vms(params):
+    logger.debug("Starting new nodes on openstack")
+    params = params.copy()
+    count = params.pop('count')
+
+    if isinstance(count, basestring):
+        assert count.startswith("x")
+        lst = NOVA_CONNECTION.services.list(binary='nova-compute')
+        srv_count = len([srv for srv in lst if srv.status == 'enabled'])
+        count = srv_count * int(count[1:])
+
+    srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
+    msg_templ = "Will start {0} servers with next params: {1}"
+    logger.info(msg_templ.format(count, srv_params))
+    vm_creds = params.pop('creds')
+
+    params = params.copy()
+
+    params['img_name'] = params['image']['name']
+    params['flavor_name'] = params['flavor']['name']
+
+    del params['image']
+    del params['flavor']
+    del params['scheduler_group_name']
+    private_key_path = params.pop('private_key_path')
+
+    for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **params):
+        conn_uri = vm_creds.format(ip=ip, private_key_path=private_key_path)
+        yield Node(conn_uri, []), os_node.id
+
+
+def create_vms_mt(nova, amount, group_name, keypair_name, img_name,
+                  flavor_name, vol_sz=None, network_zone_name=None,
+                  flt_ip_pool=None, name_templ='wally-{id}',
+                  scheduler_hints=None, security_group=None):
+
+    with ThreadPoolExecutor(max_workers=16) as executor:
+        if network_zone_name is not None:
+            network_future = executor.submit(nova.networks.find,
+                                             label=network_zone_name)
+        else:
+            network_future = None
+
+        fl_future = executor.submit(nova.flavors.find, name=flavor_name)
+        img_future = executor.submit(nova.images.find, name=img_name)
+
+        if flt_ip_pool is not None:
+            ips_future = executor.submit(get_floating_ips,
+                                         nova, flt_ip_pool, amount)
+            logger.debug("Wait for floating ip")
+            ips = ips_future.result()
+            ips += [Allocate] * (amount - len(ips))
+        else:
+            ips = [None] * amount
+
+        logger.debug("Getting flavor object")
+        fl = fl_future.result()
+        logger.debug("Getting image object")
+        img = img_future.result()
+
+        if network_future is not None:
+            logger.debug("Waiting for network results")
+            nics = [{'net-id': network_future.result().id}]
+        else:
+            nics = None
+
+        names = []
+        for i in range(amount):
+            names.append(name_templ.format(group=group_name, id=i))
+
+        futures = []
+        logger.debug("Requesting new vms")
+
+        for name, flt_ip in zip(names, ips):
+            params = (nova, name, keypair_name, img, fl,
+                      nics, vol_sz, flt_ip, scheduler_hints,
+                      flt_ip_pool, [security_group])
+
+            futures.append(executor.submit(create_vm, *params))
+        res = [future.result() for future in futures]
+        logger.debug("Done spawning")
+        return res
+
+
+def create_vm(nova, name, keypair_name, img,
+              fl, nics, vol_sz=None,
+              flt_ip=False,
+              scheduler_hints=None,
+              pool=None,
+              security_groups=None):
+    for i in range(3):
+        srv = nova.servers.create(name,
+                                  flavor=fl,
+                                  image=img,
+                                  nics=nics,
+                                  key_name=keypair_name,
+                                  scheduler_hints=scheduler_hints,
+                                  security_groups=security_groups)
+
+        if not wait_for_server_active(nova, srv):
+            msg = "Server {0} fails to start. Kill it and try again"
+            logger.debug(msg.format(srv))
+            nova.servers.delete(srv)
+
+            for j in range(120):
+                # print "wait till server deleted"
+                all_id = set(alive_srv.id for alive_srv in nova.servers.list())
+                if srv.id not in all_id:
+                    break
+                time.sleep(1)
+            else:
+                raise RuntimeError("Server {0} delete timeout".format(srv.id))
+        else:
+            break
+    else:
+        raise RuntimeError("Failed to start server".format(srv.id))
+
+    if vol_sz is not None:
+        # print "creating volume"
+        vol = create_volume(vol_sz, name)
+        # print "attach volume to server"
+        nova.volumes.create_server_volume(srv.id, vol['id'], None)
+
+    if flt_ip is Allocate:
+        flt_ip = nova.floating_ips.create(pool)
+
+    if flt_ip is not None:
+        # print "attaching ip to server"
+        srv.add_floating_ip(flt_ip)
+
+    return flt_ip.ip, nova.servers.get(srv.id)
+
+
+def clear_nodes(nodes_ids):
+    clear_all(NOVA_CONNECTION, nodes_ids, None)
+
+
+def clear_all(nova, ids=None, name_templ="ceph-test-{0}"):
+
+    def need_delete(srv):
+        if name_templ is not None:
+            return re.match(name_templ.format("\\d+"), srv.name) is not None
+        else:
+            return srv.id in ids
+
+    deleted_srvs = set()
+    for srv in nova.servers.list():
+        if need_delete(srv):
+            logger.debug("Deleting server {0}".format(srv.name))
+            nova.servers.delete(srv)
+            deleted_srvs.add(srv.id)
+
+    count = 0
+    while True:
+        if count % 60 == 0:
+            logger.debug("Waiting till all servers are actually deleted")
+        all_id = set(srv.id for srv in nova.servers.list())
+        if len(all_id.intersection(deleted_srvs)) == 0:
+            break
+        count += 1
+        time.sleep(1)
+    logger.debug("Done, deleting volumes")
+
+    # wait till vm actually deleted
+
+    if name_templ is not None:
+        cinder = c_client(*ostack_get_creds())
+        for vol in cinder.volumes.list():
+            if isinstance(vol.display_name, basestring):
+                if re.match(name_templ.format("\\d+"), vol.display_name):
+                    if vol.status in ('available', 'error'):
+                        logger.debug("Deleting volume " + vol.display_name)
+                        cinder.volumes.delete(vol)
+
+    logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/wally/statistic.py b/wally/statistic.py
new file mode 100644
index 0000000..0729283
--- /dev/null
+++ b/wally/statistic.py
@@ -0,0 +1,128 @@
+import math
+import itertools
+
+try:
+    from numpy import array, linalg
+    from scipy.optimize import leastsq
+    from numpy.polynomial.chebyshev import chebfit, chebval
+except ImportError:
+    no_numpy = True
+
+
+def med_dev(vals):
+    med = sum(vals) / len(vals)
+    dev = ((sum(abs(med - i) ** 2.0 for i in vals) / len(vals)) ** 0.5)
+    return med, dev
+
+
+def round_deviation(med_dev):
+    med, dev = med_dev
+
+    if dev < 1E-7:
+        return med_dev
+
+    dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
+    dev = int(dev / dev_div) * dev_div
+    med = int(med / dev_div) * dev_div
+    return (type(med_dev[0])(med),
+            type(med_dev[1])(dev))
+
+
+def groupby_globally(data, key_func):
+    grouped = {}
+    grouped_iter = itertools.groupby(data, key_func)
+
+    for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
+        key = (bs, cache_tp, act, conc)
+        grouped.setdefault(key, []).extend(curr_data_it)
+
+    return grouped
+
+
+def approximate_curve(x, y, xnew, curved_coef):
+    """returns ynew - y values of some curve approximation"""
+    if no_numpy:
+        return None
+
+    return chebval(xnew, chebfit(x, y, curved_coef))
+
+
+def approximate_line(x, y, xnew, relative_dist=False):
+    """ x, y - test data, xnew - dots, where we want find approximation
+        if not relative_dist distance = y - newy
+        returns ynew - y values of linear approximation"""
+
+    if no_numpy:
+        return None
+
+    # convert to numpy.array (don't work without it)
+    ox = array(x)
+    oy = array(y)
+
+    # set approximation function
+    def func_line(tpl, x):
+        return tpl[0] * x + tpl[1]
+
+    def error_func_rel(tpl, x, y):
+        return 1.0 - y / func_line(tpl, x)
+
+    def error_func_abs(tpl, x, y):
+        return y - func_line(tpl, x)
+
+    # choose distance mode
+    error_func = error_func_rel if relative_dist else error_func_abs
+
+    tpl_initial = tuple(linalg.solve([[ox[0], 1.0], [ox[1], 1.0]],
+                                     oy[:2]))
+
+    # find line
+    tpl_final, success = leastsq(error_func,
+                                 tpl_initial[:],
+                                 args=(ox, oy))
+
+    # if error
+    if success not in range(1, 5):
+        raise ValueError("No line for this dots")
+
+    # return new dots
+    return func_line(tpl_final, array(xnew))
+
+
+def difference(y, ynew):
+    """returns average and maximum relative and
+       absolute differences between y and ynew
+       result may contain None values for y = 0
+       return value - tuple:
+       [(abs dif, rel dif) * len(y)],
+       (abs average, abs max),
+       (rel average, rel max)"""
+
+    abs_dlist = []
+    rel_dlist = []
+
+    for y1, y2 in zip(y, ynew):
+        # absolute
+        abs_dlist.append(y1 - y2)
+
+        if y1 > 1E-6:
+            rel_dlist.append(abs(abs_dlist[-1] / y1))
+        else:
+            raise ZeroDivisionError("{0!r} is too small".format(y1))
+
+    da_avg = sum(abs_dlist) / len(abs_dlist)
+    dr_avg = sum(rel_dlist) / len(rel_dlist)
+
+    return (zip(abs_dlist, rel_dlist),
+            (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
+            )
+
+
+def calculate_distribution_properties(data):
+    """chi, etc"""
+
+
+def minimal_measurement_amount(data, max_diff, req_probability):
+    """
+    should returns amount of measurements to get results (avg and deviation)
+    with error less, that max_diff in at least req_probability% cases
+    """
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
new file mode 100644
index 0000000..7b6610e
--- /dev/null
+++ b/wally/suits/__init__.py
@@ -0,0 +1,3 @@
+from .itest import TwoScriptTest, PgBenchTest, IOPerfTest
+
+__all__ = ["TwoScriptTest", "PgBenchTest", "IOPerfTest"]
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/io/__init__.py
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
new file mode 100644
index 0000000..7589346
--- /dev/null
+++ b/wally/suits/io/agent.py
@@ -0,0 +1,688 @@
+import sys
+import time
+import json
+import random
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+
+
+SECTION = 0
+SETTING = 1
+
+
+def get_test_sync_mode(config):
+    try:
+        return config['sync_mode']
+    except KeyError:
+        pass
+
+    is_sync = config.get("sync", "0") == "1"
+    is_direct = config.get("direct", "0") == "1"
+
+    if is_sync and is_direct:
+        return 'x'
+    elif is_sync:
+        return 's'
+    elif is_direct:
+        return 'd'
+    else:
+        return 'a'
+
+
+def get_test_summary(params):
+    rw = {"randread": "rr",
+          "randwrite": "rw",
+          "read": "sr",
+          "write": "sw"}[params["rw"]]
+
+    sync_mode = get_test_sync_mode(params)
+    th_count = params.get('numjobs')
+    if th_count is None:
+        th_count = params.get('concurence', '1')
+    th_count = int(th_count)
+
+    return "{0}{1}{2}th{3}".format(rw,
+                                   sync_mode,
+                                   params['blocksize'],
+                                   th_count)
+
+
+counter = [0]
+
+
+def process_section(name, vals, defaults, format_params):
+    vals = vals.copy()
+    params = format_params.copy()
+
+    if '*' in name:
+        name, repeat = name.split('*')
+        name = name.strip()
+        repeat = int(repeat.format(**params))
+    else:
+        repeat = 1
+
+    # this code can be optimized
+    iterable_names = []
+    iterable_values = []
+    processed_vals = {}
+
+    for val_name, val in vals.items():
+        if val is None:
+            processed_vals[val_name] = val
+        # remove hardcode
+        elif val.startswith('{%'):
+            assert val.endswith("%}")
+            content = val[2:-2].format(**params)
+            iterable_names.append(val_name)
+            iterable_values.append(list(i.strip() for i in content.split(',')))
+        else:
+            processed_vals[val_name] = val.format(**params)
+
+    group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
+    if iterable_values == []:
+        params['UNIQ'] = 'UN{0}'.format(counter[0])
+        counter[0] += 1
+        params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+        if processed_vals.get('numjobs', '1') != '1':
+            assert 'group_reporting' in processed_vals, group_report_err_msg
+
+        ramp_time = processed_vals.get('ramp_time')
+        for i in range(repeat):
+            yield name.format(**params), processed_vals.copy()
+
+            if 'ramp_time' in processed_vals:
+                del processed_vals['ramp_time']
+
+        if ramp_time is not None:
+            processed_vals['ramp_time'] = ramp_time
+    else:
+        for it_vals in itertools.product(*iterable_values):
+            processed_vals.update(dict(zip(iterable_names, it_vals)))
+            params['UNIQ'] = 'UN{0}'.format(counter[0])
+            counter[0] += 1
+            params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+            if processed_vals.get('numjobs', '1') != '1':
+                assert 'group_reporting' in processed_vals,\
+                    group_report_err_msg
+
+            ramp_time = processed_vals.get('ramp_time')
+
+            for i in range(repeat):
+                yield name.format(**params), processed_vals.copy()
+                if 'ramp_time' in processed_vals:
+                    processed_vals['_ramp_time'] = ramp_time
+                    processed_vals.pop('ramp_time')
+
+            if ramp_time is not None:
+                processed_vals['ramp_time'] = ramp_time
+                processed_vals.pop('_ramp_time')
+
+
+def calculate_execution_time(combinations):
+    time = 0
+    for _, params in combinations:
+        time += int(params.get('ramp_time', 0))
+        time += int(params.get('runtime', 0))
+    return time
+
+
+def parse_fio_config_full(fio_cfg, params=None):
+    defaults = {}
+    format_params = {}
+
+    if params is None:
+        ext_params = {}
+    else:
+        ext_params = params.copy()
+
+    curr_section = None
+    curr_section_name = None
+
+    for tp, name, val in parse_fio_config_iter(fio_cfg):
+        if tp == SECTION:
+            non_def = curr_section_name != 'defaults'
+            if curr_section_name is not None and non_def:
+                format_params.update(ext_params)
+                for sec in process_section(curr_section_name,
+                                           curr_section,
+                                           defaults,
+                                           format_params):
+                    yield sec
+
+            if name == 'defaults':
+                curr_section = defaults
+            else:
+                curr_section = OrderedDict()
+                curr_section.update(defaults)
+            curr_section_name = name
+
+        else:
+            assert tp == SETTING
+            assert curr_section_name is not None, "no section name"
+            if name == name.upper():
+                assert curr_section_name == 'defaults'
+                format_params[name] = val
+            else:
+                curr_section[name] = val
+
+    if curr_section_name is not None and curr_section_name != 'defaults':
+        format_params.update(ext_params)
+        for sec in process_section(curr_section_name,
+                                   curr_section,
+                                   defaults,
+                                   format_params):
+            yield sec
+
+
+def parse_fio_config_iter(fio_cfg):
+    for lineno, line in enumerate(fio_cfg.split("\n")):
+        try:
+            line = line.strip()
+
+            if line.startswith("#") or line.startswith(";"):
+                continue
+
+            if line == "":
+                continue
+
+            if line.startswith('['):
+                assert line.endswith(']'), "name should ends with ]"
+                yield SECTION, line[1:-1], None
+            elif '=' in line:
+                opt_name, opt_val = line.split('=', 1)
+                yield SETTING, opt_name.strip(), opt_val.strip()
+            else:
+                yield SETTING, line, None
+        except Exception as exc:
+            pref = "During parsing line number {0}\n".format(lineno)
+            raise ValueError(pref + exc.message)
+
+
+def format_fio_config(fio_cfg):
+    res = ""
+    for pos, (name, section) in enumerate(fio_cfg):
+        if name.startswith('_'):
+            continue
+
+        if pos != 0:
+            res += "\n"
+
+        res += "[{0}]\n".format(name)
+        for opt_name, opt_val in section.items():
+            if opt_val is None:
+                res += opt_name + "\n"
+            else:
+                res += "{0}={1}\n".format(opt_name, opt_val)
+    return res
+
+
+count = 0
+
+
+def to_bytes(sz):
+    sz = sz.lower()
+    try:
+        return int(sz)
+    except ValueError:
+        if sz[-1] == 'm':
+            return (1024 ** 2) * int(sz[:-1])
+        if sz[-1] == 'k':
+            return 1024 * int(sz[:-1])
+        raise
+
+
+def do_run_fio_fake(bconf):
+    def estimate_iops(sz, bw, lat):
+        return 1 / (lat + float(sz) / bw)
+    global count
+    count += 1
+    parsed_out = []
+
+    BW = 120.0 * (1024 ** 2)
+    LAT = 0.003
+
+    for name, cfg in bconf:
+        sz = to_bytes(cfg['blocksize'])
+        curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
+        curr_ulat = curr_lat * 1000000
+        curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
+        iops = estimate_iops(sz, curr_bw, curr_lat)
+        bw = iops * sz
+
+        res = {'ctx': 10683,
+               'error': 0,
+               'groupid': 0,
+               'jobname': name,
+               'majf': 0,
+               'minf': 30,
+               'read': {'bw': 0,
+                        'bw_agg': 0.0,
+                        'bw_dev': 0.0,
+                        'bw_max': 0,
+                        'bw_mean': 0.0,
+                        'bw_min': 0,
+                        'clat': {'max': 0,
+                                 'mean': 0.0,
+                                 'min': 0,
+                                 'stddev': 0.0},
+                        'io_bytes': 0,
+                        'iops': 0,
+                        'lat': {'max': 0, 'mean': 0.0,
+                                'min': 0, 'stddev': 0.0},
+                        'runtime': 0,
+                        'slat': {'max': 0, 'mean': 0.0,
+                                 'min': 0, 'stddev': 0.0}
+                        },
+               'sys_cpu': 0.64,
+               'trim': {'bw': 0,
+                        'bw_agg': 0.0,
+                        'bw_dev': 0.0,
+                        'bw_max': 0,
+                        'bw_mean': 0.0,
+                        'bw_min': 0,
+                        'clat': {'max': 0,
+                                 'mean': 0.0,
+                                 'min': 0,
+                                 'stddev': 0.0},
+                        'io_bytes': 0,
+                        'iops': 0,
+                        'lat': {'max': 0, 'mean': 0.0,
+                                'min': 0, 'stddev': 0.0},
+                        'runtime': 0,
+                        'slat': {'max': 0, 'mean': 0.0,
+                                 'min': 0, 'stddev': 0.0}
+                        },
+               'usr_cpu': 0.23,
+               'write': {'bw': 0,
+                         'bw_agg': 0,
+                         'bw_dev': 0,
+                         'bw_max': 0,
+                         'bw_mean': 0,
+                         'bw_min': 0,
+                         'clat': {'max': 0, 'mean': 0,
+                                  'min': 0, 'stddev': 0},
+                         'io_bytes': 0,
+                         'iops': 0,
+                         'lat': {'max': 0, 'mean': 0,
+                                 'min': 0, 'stddev': 0},
+                         'runtime': 0,
+                         'slat': {'max': 0, 'mean': 0.0,
+                                  'min': 0, 'stddev': 0.0}
+                         }
+               }
+
+        if cfg['rw'] in ('read', 'randread'):
+            key = 'read'
+        elif cfg['rw'] in ('write', 'randwrite'):
+            key = 'write'
+        else:
+            raise ValueError("Uknown op type {0}".format(key))
+
+        res[key]['bw'] = bw
+        res[key]['iops'] = iops
+        res[key]['runtime'] = 30
+        res[key]['io_bytes'] = res[key]['runtime'] * bw
+        res[key]['bw_agg'] = bw
+        res[key]['bw_dev'] = bw / 30
+        res[key]['bw_max'] = bw * 1.5
+        res[key]['bw_min'] = bw / 1.5
+        res[key]['bw_mean'] = bw
+        res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
+                            'min': curr_ulat / 2, 'stddev': curr_ulat}
+        res[key]['lat'] = res[key]['clat'].copy()
+        res[key]['slat'] = res[key]['clat'].copy()
+
+        parsed_out.append(res)
+
+    return zip(parsed_out, bconf)
+
+
+def do_run_fio(bconf):
+    benchmark_config = format_fio_config(bconf)
+    cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
+    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+
+    # set timeout
+    raw_out, _ = p.communicate(benchmark_config)
+
+    try:
+        parsed_out = json.loads(raw_out)["jobs"]
+    except KeyError:
+        msg = "Can't parse fio output {0!r}: no 'jobs' found"
+        raw_out = raw_out[:100]
+        raise ValueError(msg.format(raw_out))
+
+    except Exception as exc:
+        msg = "Can't parse fio output: {0!r}\nError: {1}"
+        raw_out = raw_out[:100]
+        raise ValueError(msg.format(raw_out, exc.message))
+
+    return zip(parsed_out, bconf)
+
+# limited by fio
+MAX_JOBS = 1000
+
+
+def next_test_portion(whole_conf, runcycle):
+    jcount = 0
+    runtime = 0
+    bconf = []
+
+    for pos, (name, sec) in enumerate(whole_conf):
+        jc = int(sec.get('numjobs', '1'))
+
+        if runcycle is not None:
+            curr_task_time = calculate_execution_time([(name, sec)])
+        else:
+            curr_task_time = 0
+
+        if jc > MAX_JOBS:
+            err_templ = "Can't process job {0!r} - too large numjobs"
+            raise ValueError(err_templ.format(name))
+
+        if runcycle is not None and len(bconf) != 0:
+            rc_ok = curr_task_time + runtime <= runcycle
+        else:
+            rc_ok = True
+
+        if jc + jcount <= MAX_JOBS and rc_ok:
+            runtime += curr_task_time
+            jcount += jc
+            bconf.append((name, sec))
+            if '_ramp_time' in sec:
+                del sec['_ramp_time']
+            continue
+
+        assert len(bconf) != 0
+        yield bconf
+
+        if '_ramp_time' in sec:
+            sec['ramp_time'] = sec.pop('_ramp_time')
+            curr_task_time = calculate_execution_time([(name, sec)])
+
+        runtime = curr_task_time
+        jcount = jc
+        bconf = [(name, sec)]
+
+    if bconf != []:
+        yield bconf
+
+
+def add_job_results(jname, job_output, jconfig, res):
+    if job_output['write']['iops'] != 0:
+        raw_result = job_output['write']
+    else:
+        raw_result = job_output['read']
+
+    if jname not in res:
+        j_res = {}
+        j_res["rw"] = jconfig["rw"]
+        j_res["sync_mode"] = get_test_sync_mode(jconfig)
+        j_res["concurence"] = int(jconfig.get("numjobs", 1))
+        j_res["blocksize"] = jconfig["blocksize"]
+        j_res["jobname"] = job_output["jobname"]
+        j_res["timings"] = [int(jconfig.get("runtime", 0)),
+                            int(jconfig.get("ramp_time", 0))]
+    else:
+        j_res = res[jname]
+        assert j_res["rw"] == jconfig["rw"]
+        assert j_res["rw"] == jconfig["rw"]
+        assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
+        assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+        assert j_res["blocksize"] == jconfig["blocksize"]
+        assert j_res["jobname"] == job_output["jobname"]
+
+        # ramp part is skipped for all tests, except first
+        # assert j_res["timings"] == (jconfig.get("runtime"),
+        #                             jconfig.get("ramp_time"))
+
+    def j_app(name, x):
+        j_res.setdefault(name, []).append(x)
+
+    j_app("bw", raw_result["bw"])
+    j_app("iops", raw_result["iops"])
+    j_app("lat", raw_result["lat"]["mean"])
+    j_app("clat", raw_result["clat"]["mean"])
+    j_app("slat", raw_result["slat"]["mean"])
+
+    res[jname] = j_res
+
+
+def compile(benchmark_config, params, runcycle=None):
+    whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    res = ""
+
+    for bconf in next_test_portion(whole_conf, runcycle):
+        res += format_fio_config(bconf)
+
+    return res
+
+
+def run_fio(benchmark_config,
+            params,
+            runcycle=None,
+            raw_results_func=None,
+            skip_tests=0,
+            fake_fio=False):
+
+    whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip_tests:]
+    res = {}
+    curr_test_num = skip_tests
+    executed_tests = 0
+    ok = True
+    try:
+        for bconf in next_test_portion(whole_conf, runcycle):
+
+            if fake_fio:
+                res_cfg_it = do_run_fio_fake(bconf)
+            else:
+                res_cfg_it = do_run_fio(bconf)
+
+            res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+
+            for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+                executed_tests += 1
+                if raw_results_func is not None:
+                    raw_results_func(executed_tests,
+                                     [job_output, jname, jconfig])
+
+                assert jname == job_output["jobname"], \
+                    "{0} != {1}".format(jname, job_output["jobname"])
+
+                if jname.startswith('_'):
+                    continue
+
+                add_job_results(jname, job_output, jconfig, res)
+
+            msg_template = "Done {0} tests from {1}. ETA: {2}"
+            exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
+
+            print msg_template.format(curr_test_num - skip_tests,
+                                      len(whole_conf),
+                                      sec_to_str(exec_time))
+
+    except (SystemExit, KeyboardInterrupt):
+        raise
+
+    except Exception:
+        print "=========== ERROR ============="
+        traceback.print_exc()
+        print "======== END OF ERROR ========="
+        ok = False
+
+    return res, executed_tests, ok
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+    if 'fio' == binary_tp:
+        return run_fio(*argv, **kwargs)
+    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def read_config(fd, timeout=10):
+    job_cfg = ""
+    etime = time.time() + timeout
+    while True:
+        wtime = etime - time.time()
+        if wtime <= 0:
+            raise IOError("No config provided")
+
+        r, w, x = select.select([fd], [], [], wtime)
+        if len(r) == 0:
+            raise IOError("No config provided")
+
+        char = fd.read(1)
+        if '' == char:
+            return job_cfg
+
+        job_cfg += char
+
+
+def estimate_cfg(job_cfg, params, skip_tests=0):
+    bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
+    return calculate_execution_time(bconf)
+
+
+def sec_to_str(seconds):
+    h = seconds // 3600
+    m = (seconds % 3600) // 60
+    s = seconds % 60
+    return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument("--type", metavar="BINARY_TYPE",
+                        choices=['fio'], default='fio',
+                        help=argparse.SUPPRESS)
+    parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+                        help="Start execution at START_AT_UTC")
+    parser.add_argument("--json", action="store_true", default=False,
+                        help="Json output format")
+    parser.add_argument("--output", default='-', metavar="FILE_PATH",
+                        help="Store results to FILE_PATH")
+    parser.add_argument("--estimate", action="store_true", default=False,
+                        help="Only estimate task execution time")
+    parser.add_argument("--compile", action="store_true", default=False,
+                        help="Compile config file to fio config")
+    parser.add_argument("--num-tests", action="store_true", default=False,
+                        help="Show total number of tests")
+    parser.add_argument("--runcycle", type=int, default=None,
+                        metavar="MAX_CYCLE_SECONDS",
+                        help="Max cycle length in seconds")
+    parser.add_argument("--show-raw-results", action='store_true',
+                        default=False, help="Output raw input and results")
+    parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+                        help="Skip NUM tests")
+    parser.add_argument("--faked-fio", action='store_true',
+                        default=False, help="Emulate fio with 0 test time")
+    parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+                        default=[],
+                        help="Provide set of pairs PARAM=VAL to" +
+                             "format into job description")
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+
+    if argv_obj.jobfile == '-':
+        job_cfg = read_config(sys.stdin)
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    if argv_obj.output == '-':
+        out_fd = sys.stdout
+    else:
+        out_fd = open(argv_obj.output, "w")
+
+    params = {}
+    for param_val in argv_obj.params:
+        assert '=' in param_val
+        name, val = param_val.split("=", 1)
+        params[name] = val
+
+    if argv_obj.estimate:
+        print sec_to_str(estimate_cfg(job_cfg, params))
+        return 0
+
+    if argv_obj.num_tests or argv_obj.compile:
+        bconf = list(parse_fio_config_full(job_cfg, params))
+        bconf = bconf[argv_obj.skip_tests:]
+
+        if argv_obj.compile:
+            out_fd.write(format_fio_config(bconf))
+            out_fd.write("\n")
+
+        if argv_obj.num_tests:
+            print len(bconf)
+
+        return 0
+
+    if argv_obj.start_at is not None:
+        ctime = time.time()
+        if argv_obj.start_at >= ctime:
+            time.sleep(ctime - argv_obj.start_at)
+
+    def raw_res_func(test_num, data):
+        pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+        out_fd.write(pref)
+        out_fd.write(json.dumps(data))
+        out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+        out_fd.flush()
+
+    rrfunc = raw_res_func if argv_obj.show_raw_results else None
+
+    stime = time.time()
+    job_res, num_tests, ok = run_benchmark(argv_obj.type,
+                                           job_cfg,
+                                           params,
+                                           argv_obj.runcycle,
+                                           rrfunc,
+                                           argv_obj.skip_tests,
+                                           argv_obj.faked_fio)
+    etime = time.time()
+
+    res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
+
+    oformat = 'json' if argv_obj.json else 'eval'
+    out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
+                                                           int(etime - stime)))
+    out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+    if argv_obj.json:
+        out_fd.write(json.dumps(res))
+    else:
+        out_fd.write(pprint.pformat(res) + "\n")
+    out_fd.write("\n========= END OF RESULTS =========\n")
+
+    return 0 if ok else 1
+
+
+def fake_main(x):
+    import yaml
+    time.sleep(60)
+    out_fd = sys.stdout
+    fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
+    res = yaml.load(open(fname).read())[0][1]
+    out_fd.write("========= RESULTS(format=json) =========\n")
+    out_fd.write(json.dumps(res))
+    out_fd.write("\n========= END OF RESULTS =========\n")
+    return 0
+
+
+if __name__ == '__main__':
+    # exit(fake_main(sys.argv[1:]))
+    exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
new file mode 100644
index 0000000..529b78a
--- /dev/null
+++ b/wally/suits/io/formatter.py
@@ -0,0 +1,53 @@
+import texttable
+
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+from wally.suits.io.agent import get_test_summary
+
+
+def key_func(k_data):
+    _, data = k_data
+
+    return (data['rw'],
+            data['sync_mode'],
+            ssize_to_b(data['blocksize']),
+            data['concurence'])
+
+
+def format_results_for_console(test_set):
+    """
+    create a table with io performance report
+    for console
+    """
+    tab = texttable.Texttable()
+    tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+    tab.set_cols_align(["l", "r", "r", "r", "r"])
+
+    prev_k = None
+    items = sorted(test_set['res'].items(), key=key_func)
+
+    for test_name, data in items:
+        curr_k = key_func((test_name, data))[:3]
+
+        if prev_k is not None:
+            if prev_k != curr_k:
+                tab.add_row(["---"] * 5)
+
+        prev_k = curr_k
+
+        descr = get_test_summary(data)
+
+        iops, _ = med_dev(data['iops'])
+        bw, bwdev = med_dev(data['bw'])
+
+        # 3 * sigma
+        dev_perc = int((bwdev * 300) / bw)
+
+        params = (descr, int(iops), int(bw), dev_perc,
+                  int(med_dev(data['lat'])[0]) // 1000)
+        tab.add_row(params)
+
+    header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
+    tab.header(header)
+
+    return tab.draw()
diff --git a/wally/suits/io/io_scenario_check_distribution.cfg b/wally/suits/io/io_scenario_check_distribution.cfg
new file mode 100644
index 0000000..6ba3f9f
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_distribution.cfg
@@ -0,0 +1,13 @@
+[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
diff --git a/wally/suits/io/io_scenario_check_linearity.cfg b/wally/suits/io/io_scenario_check_linearity.cfg
new file mode 100644
index 0000000..4017cf3
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_linearity.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw=randwrite
+sync=1
+
diff --git a/wally/suits/io/io_scenario_check_th_count.cfg b/wally/suits/io/io_scenario_check_th_count.cfg
new file mode 100644
index 0000000..3d57154
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_th_count.cfg
@@ -0,0 +1,46 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+#
+#    RANDOM R IOPS, DIRECT, should act same as AS (4k + randread + sync)
+#    just faster. Not sure, that we need it
+# 4k + randread  + direct
+#
+#     RANDOM R/W IOPS
+# 4k + randread  + sync
+# 4k + randwrite + sync
+#
+#     LINEAR BW
+# 1m + write     + direct
+# 1m + read      + direct
+#
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw={% randread %}
+direct=1
+sync=0
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=0
+sync=1
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% write, read %}
+direct=1
+sync=0
diff --git a/wally/suits/io/io_scenario_check_vm_count_ec2.cfg b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
new file mode 100644
index 0000000..19c9e50
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+rate={BW_LIMIT}
+rate_iops={IOPS_LIMIT}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw={% randwrite, randread %}
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=0
+sync=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/wally/suits/io/io_scenario_check_warmup.cfg b/wally/suits/io/io_scenario_check_warmup.cfg
new file mode 100644
index 0000000..6a9c622
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_warmup.cfg
@@ -0,0 +1,33 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time=0
+runtime={% 10, 15, 20, 30, 60, 120 %}
+
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
new file mode 100644
index 0000000..5e24009
--- /dev/null
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -0,0 +1,50 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+ramp_time=5
+size=10Gb
+runtime=30
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% read, write %}
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check IOPS randwrite.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
diff --git a/wally/suits/io/io_scenario_long_test.cfg b/wally/suits/io/io_scenario_long_test.cfg
new file mode 100644
index 0000000..4b0a79d
--- /dev/null
+++ b/wally/suits/io/io_scenario_long_test.cfg
@@ -0,0 +1,28 @@
+[defaults]
+
+# 24h test
+NUM_ROUNDS1=270
+NUM_ROUNDS2=261
+
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=50Gb
+time_based
+runtime=300
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[24h_test * {NUM_ROUNDS1}]
+blocksize=128k
+rw=randwrite
+direct=1
+runtime=30
+
+[24h_test * {NUM_ROUNDS2}]
+blocksize=128k
+rw=randwrite
+direct=1
+
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
new file mode 100644
index 0000000..25721eb
--- /dev/null
+++ b/wally/suits/io/results_loader.py
@@ -0,0 +1,55 @@
+import re
+import json
+
+
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+
+
+def parse_output(out_err):
+    start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+    for block in re.split(start_patt, out_err)[1:]:
+        data, garbage = re.split(end_patt, block)
+        yield json.loads(data.strip())
+
+    start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
+    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+    for block in re.split(start_patt, out_err)[1:]:
+        data, garbage = re.split(end_patt, block)
+        yield eval(data.strip())
+
+
+def filter_data(name_prefix, fields_to_select, **filters):
+    def closure(data):
+        for result in data:
+            if name_prefix is not None:
+                if not result['jobname'].startswith(name_prefix):
+                    continue
+
+            for k, v in filters.items():
+                if result.get(k) != v:
+                    break
+            else:
+                yield map(result.get, fields_to_select)
+    return closure
+
+
+def load_data(raw_data):
+    data = list(parse_output(raw_data))[0]
+
+    for key, val in data['res'].items():
+        val['blocksize_b'] = ssize_to_b(val['blocksize'])
+
+        val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
+        val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
+        val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
+        yield val
+
+
+def load_files(*fnames):
+    for fname in fnames:
+        for i in load_data(open(fname).read()):
+            yield i
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
new file mode 100644
index 0000000..91e9dd5
--- /dev/null
+++ b/wally/suits/itest.py
@@ -0,0 +1,237 @@
+import abc
+import time
+import os.path
+import logging
+
+from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
+from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+
+from . import postgres
+from .io import agent as io_agent
+from .io import formatter as io_formatter
+from .io.results_loader import parse_output
+
+
+logger = logging.getLogger("wally")
+
+
+class IPerfTest(object):
+    def __init__(self, on_result_cb, log_directory=None, node=None):
+        self.on_result_cb = on_result_cb
+        self.log_directory = log_directory
+        self.node = node
+
+    def pre_run(self, conn):
+        pass
+
+    def cleanup(self, conn):
+        pass
+
+    @abc.abstractmethod
+    def run(self, conn, barrier):
+        pass
+
+    @classmethod
+    def format_for_console(cls, data):
+        msg = "{0}.format_for_console".format(cls.__name__)
+        raise NotImplementedError(msg)
+
+
+class TwoScriptTest(IPerfTest):
+    remote_tmp_dir = '/tmp'
+
+    def __init__(self, opts, on_result_cb, log_directory=None, node=None):
+        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+        self.opts = opts
+
+        if 'run_script' in self.opts:
+            self.run_script = self.opts['run_script']
+            self.prepare_script = self.opts['prepare_script']
+
+    def get_remote_for_script(self, script):
+        return os.path.join(self.tmp_dir, script.rpartition('/')[2])
+
+    def copy_script(self, conn, src):
+        remote_path = self.get_remote_for_script(src)
+        copy_paths(conn, {src: remote_path})
+        return remote_path
+
+    def pre_run(self, conn):
+        remote_script = self.copy_script(conn, self.pre_run_script)
+        cmd = remote_script
+        run_over_ssh(conn, cmd, node=self.node)
+
+    def run(self, conn, barrier):
+        remote_script = self.copy_script(conn, self.run_script)
+        cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
+                             in self.opts.items()])
+        cmd = remote_script + ' ' + cmd_opts
+        out_err = run_over_ssh(conn, cmd, node=self.node)
+        self.on_result(out_err, cmd)
+
+    def parse_results(self, out):
+        for line in out.split("\n"):
+            key, separator, value = line.partition(":")
+            if key and value:
+                self.on_result_cb((key, float(value)))
+
+    def on_result(self, out_err, cmd):
+        try:
+            self.parse_results(out_err)
+        except Exception as exc:
+            msg_templ = "Error during postprocessing results: {0!r}. {1}"
+            raise RuntimeError(msg_templ.format(exc.message, out_err))
+
+
+class PgBenchTest(TwoScriptTest):
+    root = os.path.dirname(postgres.__file__)
+    prepare_script = os.path.join(root, "prepare.sh")
+    run_script = os.path.join(root, "run.sh")
+
+
+class IOPerfTest(IPerfTest):
+    io_py_remote = "/tmp/disk_test_agent.py"
+
+    def __init__(self, test_options, on_result_cb,
+                 log_directory=None, node=None):
+        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+        self.options = test_options
+        self.config_fname = test_options['cfg']
+        self.alive_check_interval = test_options.get('alive_check_interval')
+        self.config_params = test_options.get('params', {})
+        self.tool = test_options.get('tool', 'fio')
+        self.raw_cfg = open(self.config_fname).read()
+        self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
+                                                           self.config_params))
+
+        cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+        raw_res = os.path.join(self.log_directory, "raw_results.txt")
+
+        fio_command_file = open_for_append_or_create(cmd_log)
+        fio_command_file.write(io_agent.compile(self.raw_cfg,
+                                                self.config_params,
+                                                None))
+        self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+    def cleanup(self, conn):
+        delete_file(conn, self.io_py_remote)
+        # Need to remove tempo files, used for testing
+
+    def pre_run(self, conn):
+        try:
+            run_over_ssh(conn, 'which fio', node=self.node)
+        except OSError:
+            # TODO: install fio, if not installed
+            cmd = "sudo apt-get -y install fio"
+
+            for i in range(3):
+                try:
+                    run_over_ssh(conn, cmd, node=self.node)
+                    break
+                except OSError as err:
+                    time.sleep(3)
+            else:
+                raise OSError("Can't install fio - " + err.message)
+
+        local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
+        self.files_to_copy = {local_fname: self.io_py_remote}
+        copy_paths(conn, self.files_to_copy)
+
+        files = {}
+
+        for secname, params in self.configs:
+            sz = ssize_to_b(params['size'])
+            msz = msz = sz / (1024 ** 2)
+            if sz % (1024 ** 2) != 0:
+                msz += 1
+
+            fname = params['filename']
+            files[fname] = max(files.get(fname, 0), msz)
+
+        # logger.warning("dd run DISABLED")
+        # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+        cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+        for fname, sz in files.items():
+            cmd = cmd_templ.format(fname, 1024 ** 2, msz)
+            run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+
+    def run(self, conn, barrier):
+        cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+        # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+
+        params = " ".join("{0}={1}".format(k, v)
+                          for k, v in self.config_params.items())
+
+        if "" != params:
+            params = "--params " + params
+
+        cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+        logger.debug("Waiting on barrier")
+
+        exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
+        exec_time_str = sec_to_str(exec_time)
+
+        try:
+            if barrier.wait():
+                templ = "Test should takes about {0}. Will wait at most {1}"
+                timeout = int(exec_time * 1.1 + 300)
+                logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
+
+            out_err = run_over_ssh(conn, cmd,
+                                   stdin_data=self.raw_cfg,
+                                   timeout=timeout,
+                                   node=self.node)
+            logger.info("Done")
+        finally:
+            barrier.exit()
+
+        self.on_result(out_err, cmd)
+
+    def on_result(self, out_err, cmd):
+        try:
+            for data in parse_output(out_err):
+                self.on_result_cb(data)
+        except Exception as exc:
+            msg_templ = "Error during postprocessing results: {0!r}"
+            raise RuntimeError(msg_templ.format(exc.message))
+
+    def merge_results(self, results):
+        if len(results) == 0:
+            return None
+
+        merged_result = results[0]
+        merged_data = merged_result['res']
+        expected_keys = set(merged_data.keys())
+        mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
+
+        for res in results[1:]:
+            assert res['__meta__'] == merged_result['__meta__']
+
+            data = res['res']
+            diff = set(data.keys()).symmetric_difference(expected_keys)
+
+            msg = "Difference: {0}".format(",".join(diff))
+            assert len(diff) == 0, msg
+
+            for testname, test_data in data.items():
+                res_test_data = merged_data[testname]
+
+                diff = set(test_data.keys()).symmetric_difference(
+                            res_test_data.keys())
+
+                msg = "Difference: {0}".format(",".join(diff))
+                assert len(diff) == 0, msg
+
+                for k, v in test_data.items():
+                    if k in mergable_fields:
+                        res_test_data[k].extend(v)
+                    else:
+                        msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+                        assert res_test_data[k] == v, msg
+
+        return merged_result
+
+    @classmethod
+    def format_for_console(cls, data):
+        return io_formatter.format_results_for_console(data)
diff --git a/wally/suits/postgres/__init__.py b/wally/suits/postgres/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/postgres/__init__.py
diff --git a/wally/suits/postgres/prepare.sh b/wally/suits/postgres/prepare.sh
new file mode 100755
index 0000000..e7ca3bc
--- /dev/null
+++ b/wally/suits/postgres/prepare.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+set -e
+
+if [ ! -d /etc/postgresql ]; then
+    apt-get update
+    apt-get install -y postgresql postgresql-contrib
+    err=$(pg_createcluster 9.3 main --start 2>&1  /dev/null )
+    if [ $? -ne 0 ]; then
+        echo "There was an error while creating cluster"
+        exit 1
+    fi
+fi
+
+sed -i 's/^local\s\+all\s\+all\s\+peer/local all all trust/g' /etc/postgresql/9.3/main/pg_hba.conf
+sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/g" /etc/postgresql/9.3/main/postgresql.conf
+
+service postgresql restart
+
+exit 0
\ No newline at end of file
diff --git a/wally/suits/postgres/run.sh b/wally/suits/postgres/run.sh
new file mode 100755
index 0000000..daad499
--- /dev/null
+++ b/wally/suits/postgres/run.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+set -e
+
+while [[ $# > 1 ]]
+do
+key="$1"
+
+case $key in
+    num_clients)
+    CLIENTS="$2"
+    shift
+    ;;
+    transactions_per_client)
+    TRANSACTINOS_PER_CLIENT="$2"
+    shift
+    ;;
+    *)
+    echo "Unknown option $key"
+    exit 1
+    ;;
+esac
+shift
+done
+
+CLIENTS=$(echo $CLIENTS | tr ',' '\n')
+TRANSACTINOS_PER_CLIENT=$(echo $TRANSACTINOS_PER_CLIENT | tr ',' '\n')
+
+
+sudo -u postgres createdb -O postgres pgbench &> /dev/null
+sudo -u postgres pgbench -i -U postgres pgbench &> /dev/null
+
+
+for num_clients in $CLIENTS; do
+    for trans_per_cl in $TRANSACTINOS_PER_CLIENT; do
+        tps_all=''
+        for i in 1 2 3 4 5 6 7 8 9 10; do
+            echo -n "$num_clients $trans_per_cl:"
+            sudo -u postgres pgbench -c $num_clients -n -t $trans_per_cl -j 4 -r -U postgres pgbench |
+            grep "(excluding connections establishing)" | awk {'print $3'}
+        done
+    done
+done
+
+sudo -u postgres dropdb pgbench &> /dev/null
+
+exit 0
+
diff --git a/wally/utils.py b/wally/utils.py
new file mode 100644
index 0000000..60645d4
--- /dev/null
+++ b/wally/utils.py
@@ -0,0 +1,124 @@
+import re
+import os
+import logging
+import threading
+import contextlib
+import subprocess
+
+
+logger = logging.getLogger("wally")
+
+
+def parse_creds(creds):
+    # parse user:passwd@host
+    user, passwd_host = creds.split(":", 1)
+
+    if '@' not in passwd_host:
+        passwd, host = passwd_host, None
+    else:
+        passwd, host = passwd_host.rsplit('@', 1)
+
+    return user, passwd, host
+
+
+class TaksFinished(Exception):
+    pass
+
+
+class Barrier(object):
+    def __init__(self, count):
+        self.count = count
+        self.curr_count = 0
+        self.cond = threading.Condition()
+        self.exited = False
+
+    def wait(self, timeout=None):
+        with self.cond:
+            if self.exited:
+                raise TaksFinished()
+
+            self.curr_count += 1
+            if self.curr_count == self.count:
+                self.curr_count = 0
+                self.cond.notify_all()
+                return True
+            else:
+                self.cond.wait(timeout=timeout)
+                return False
+
+    def exit(self):
+        with self.cond:
+            self.exited = True
+
+
+@contextlib.contextmanager
+def log_error(action, types=(Exception,)):
+    if not action.startswith("!"):
+        logger.debug("Starts : " + action)
+    else:
+        action = action[1:]
+
+    try:
+        yield
+    except Exception as exc:
+        if isinstance(exc, types) and not isinstance(exc, StopIteration):
+            templ = "Error during {0} stage: {1}"
+            logger.debug(templ.format(action, exc.message))
+        raise
+
+
+SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
+
+
+def ssize_to_b(ssize):
+    try:
+        ssize = ssize.lower()
+
+        if ssize.endswith("b"):
+            ssize = ssize[:-1]
+        if ssize[-1] in SMAP:
+            return int(ssize[:-1]) * SMAP[ssize[-1]]
+        return int(ssize)
+    except (ValueError, TypeError, AttributeError):
+        raise ValueError("Unknow size format {0!r}".format(ssize))
+
+
+def get_ip_for_target(target_ip):
+    cmd = 'ip route get to'.split(" ") + [target_ip]
+    data = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout.read()
+
+    rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+    rr1 = rr1.replace(" ", r'\s+')
+    rr1 = rr1.format(target_ip.replace('.', r'\.'))
+
+    rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+    rr2 = rr2.replace(" ", r'\s+')
+    rr2 = rr2.format(target_ip.replace('.', r'\.'))
+
+    data_line = data.split("\n")[0].strip()
+    res1 = re.match(rr1, data_line)
+    res2 = re.match(rr2, data_line)
+
+    if res1 is not None:
+        return res1.group('ip')
+
+    if res2 is not None:
+        return res2.group('ip')
+
+    raise OSError("Can't define interface for {0}".format(target_ip))
+
+
+def open_for_append_or_create(fname):
+    if not os.path.exists(fname):
+        return open(fname, "w")
+
+    fd = open(fname, 'r+')
+    fd.seek(0, os.SEEK_END)
+    return fd
+
+
+def sec_to_str(seconds):
+    h = seconds // 3600
+    m = (seconds % 3600) // 60
+    s = seconds % 60
+    return "{0}:{1:02d}:{2:02d}".format(h, m, s)