large commit. refactoring, fio code totally reworker, huge improvenent in test time and results, etc
diff --git a/TODO b/TODO
index 9de2aaa..8b13789 100644
--- a/TODO
+++ b/TODO
@@ -1,59 +1 @@
-go over each file and make sure, that we need it
-fix pgbench
-add ability to store all data into local file (bson/msgpack?)
-update report generation code
-allow to run package directly
-make a normal python package
-add options for username, build description, etc
-provide API for retreiving data
-fuel port forwarding and censor data collecting
 
-store all taken actions in special file and add 'clear' command to clear results
-or make a separated watchdog process, which would clear results
-
-INTEGRATE WITH BLKTRACE/SEEKWATCHER/perf
-
-!!!!add warm-up stage to each fio test!!!!!! and make io.py to ignore results from it
-
-create first release
-
-WE NEED MORE HIGHT_LEVEL TESTS
-IMPLEMENT TEMPLATE FROM
-
-Gchart looks really like 2000. Please investigate more
-modern-looking tools, like http://pygal.org/, http://www.flotcharts.org/
-and hundreds of thousands of javascript-python-c++ tools - 
-https://wiki.python.org/moin/NumericAndScientific/Plotting,
-http://www.rgraph.net/examples/bar-line-and-pie-charts.html, ...
-http://www.sitepoint.com/15-best-javascript-charting-libraries/
-
-
-make nodes.discover.discover pluggable
-
-fix grafana + influx installation
-
-allow to owerwrite config options from command=line
-
-ceph sensors support
-fio package deployment on test nodes
-send data to influx
-integrate with grafana
-integrated with db
-collected lab data during test (check and fix this code)
-
-need to store complete hardware information (e.g. like in 
-certification, but even more, but with controllable levels).
-It also should dump network topology.
-
-make a separated library for gathering hw info for node
-
-create a wiki page with tool description
-create a wiki page with usage descriptions
-create a wiki page with test selection description 
-    and where all calculations explained
-create a wiki page with test selection evaluation and provement
-create a separated repos for config
-
-
-Test methodology:
-	https://software.intel.com/en-us/blogs/2013/10/25/measure-ceph-rbd-performance-in-a-quantitative-way-part-i
diff --git a/config.py b/config.py
index a33f9b5..6a162b8 100644
--- a/config.py
+++ b/config.py
@@ -20,17 +20,17 @@
 # parser.add_argument("-c", "--chartpath")
 
 # config = parser.parse_args(sys.argv[1:])
-path = "config.yaml"
+path = "koder.yaml"
 
 # if config.path is not None:
 #     path = config.path
 
-cfg_dict = parse_config(os.path.join(os.path.dirname(__file__), path))
-basedir = cfg_dict['paths']['basedir']
-TEST_PATH = cfg_dict['paths']['TEST_PATH']
-SQLALCHEMY_MIGRATE_REPO = cfg_dict['paths']['SQLALCHEMY_MIGRATE_REPO']
-DATABASE_URI = cfg_dict['paths']['DATABASE_URI']
-CHARTS_IMG_PATH = cfg_dict['paths']['CHARTS_IMG_PATH']
+cfg_dict = parse_config(path)
+# basedir = cfg_dict['paths']['basedir']
+# TEST_PATH = cfg_dict['paths']['TEST_PATH']
+# SQLALCHEMY_MIGRATE_REPO = cfg_dict['paths']['SQLALCHEMY_MIGRATE_REPO']
+# DATABASE_URI = cfg_dict['paths']['DATABASE_URI']
+# CHARTS_IMG_PATH = cfg_dict['paths']['CHARTS_IMG_PATH']
 
 # if config.basedir is not None:
 #     basedir = config.basedir
diff --git a/config.yaml b/config.yaml
index b365042..da94a8a 100644
--- a/config.yaml
+++ b/config.yaml
@@ -1,70 +1,83 @@
-# nodes to be started/detected
-discover:
-    # openstack:
-    #     connection:
-    #         auth_url: http://172.16.54.130:5000/v2.0
-    #         creds: admin:admin@admin
-    #     discover:
-    #         vm:
-    #             name: test1
-    #             auth: cirros:cubswin:)
-    #         nodes:
-    #             service: all
-    #             auth: cirros:cubswin:)
-
-    # ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+clouds:
     ceph: local
 
-    # fuel:
-    #     connection:
-    #         url: http://172.16.52.120:8000/
-    #         creds: admin:admin@admin
-    #     discover: controller
+discover: ceph
 
 explicit_nodes:
     "ssh://192.168.152.43": testnode
 
-# start:
-#     count: x1
-#     img_name: TestVM
-#     flavor_name: m1.micro
-#     keypair_name: test
-#     network_zone_name: net04
-#     flt_ip_pool: net04_ext
-#     key_file: path/to/
-#     user: username
-
-
-# sensors to be installed, accordingli to role
 sensors:
-    receiver_uri: udp://192.168.0.108:5699
+    receiver_uri: udp://192.168.152.1:5699
     roles_mapping:
-        ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
-        # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
-        os-compute: io, net
-        test-vm: system-cpu
+        ceph-osd: block-io
+        testnode: system-cpu, block-io
 
-
-# tests to run
 tests:
-    # pgbench:
-    #     opts:
-    #         num_clients: [4, 8, 12]
-    #         transactions: [1, 2, 3]
-    io: 
-        tool: fio
-        config_file: io_task.cfg
-
-# where to store results
-results:
-    - mos-linux-http://172.12.33.45
-
-paths:
-    basedir: "/home/gstepanov/rally-results-processor"
-    TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
-    CHARTS_IMG_PATH: "static/images"
-    SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
-    DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
+    - io: tests/io_task_test.cfg
 
 logging:
-    extra_logs: 1
\ No newline at end of file
+    extra_logs: 1
+
+
+# # nodes to be started/detected
+# clouds:
+#     # openstack: file:///tmp/openrc
+#     # openstack: user:passwd:tenant, http://......
+#     # ceph: ssh://root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+#     fuel:
+#         url: http://172.16.52.112:8000
+#         creds: admin:admin:admin
+#         ssh_creds: root:test37
+#         openstack_env: test
+#     ceph: local
+
+# # discover: fuel+openstack, ceph
+
+# explicit_nodes:
+#     "ssh://192.168.152.43": testnode
+
+# # sensors to be installed, accordingly to role
+# sensors:
+#     receiver_uri: udp://192.168.152.1:5699
+#     roles_mapping:
+#         ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
+#         # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
+#         os-compute: io, net
+#         test-vm: system-cpu
+
+# # tests to run
+# tests: #$include(tests.yaml)
+#     # - with_test_nodes:
+#     #     openstack:
+#     #         vm_params:
+#     #             count: x1
+#     #             img_name: disk_io_perf
+#     #             flavor_name: disk_io_perf.256
+#     #             keypair_name: disk_io_perf
+#     #             network_zone_name: novanetwork
+#     #             flt_ip_pool: nova
+#     #             creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
+#     #     tests:
+#             # - pgbench:
+#             #     opts:
+#             #         num_clients: [4, 8, 12]
+#             #         transactions: [1, 2, 3]
+#     - io: tests/io_task_test.cfg
+
+#             # - vm_count: 
+#             #     max_lat_ms: 20
+#             #     min_bw_mbps: 60
+#             #     min_4k_direct_w_iops: 100
+#             #     min_4k_direct_r_iops: 100
+
+# # where to store results
+# # results:
+# #     - mos-linux-http://172.12.33.45
+# #     - bson, /tmp/myrun.bson
+
+# paths:
+#     basedir: "/home/gstepanov/rally-results-processor"
+#     TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
+#     CHARTS_IMG_PATH: "static/images"
+#     SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
+#     DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
diff --git a/data_processing.py b/data_processing.py
index 387ea3f..b15c579 100644
--- a/data_processing.py
+++ b/data_processing.py
@@ -62,4 +62,4 @@
     for k in m.results.keys():
         m.results[k] = [mean(m.results[k]), stdev(m.results[k])]
 
-    return m
\ No newline at end of file
+    return m
diff --git a/fuel_rest_api.py b/fuel_rest_api.py
index 12b53bb..4b02687 100644
--- a/fuel_rest_api.py
+++ b/fuel_rest_api.py
@@ -442,7 +442,7 @@
         yield Cluster(conn, **cluster_desc)
 
 
-def get_cluster_id(name, conn):
+def get_cluster_id(conn, name):
     """Get cluster id by name"""
     for cluster in get_all_clusters(conn):
         if cluster.name == name:
@@ -451,6 +451,8 @@
                 logger.debug('cluster id is %s' % cluster.id)
             return cluster.id
 
+    raise ValueError("Cluster {0} not found".format(name))
+
 
 sections = {
     'sahara': 'additional_components',
diff --git a/koder.yaml b/koder.yaml
new file mode 100644
index 0000000..9868670
--- /dev/null
+++ b/koder.yaml
@@ -0,0 +1,23 @@
+clouds:
+    ceph: local
+
+discover: ceph
+
+explicit_nodes:
+    "ssh://192.168.152.43": testnode
+
+sensors:
+    receiver_uri: udp://192.168.152.1:5699
+    roles_mapping:
+        ceph-osd: block-io
+        testnode: system-cpu, block-io
+
+tests:
+    - io:
+        cfg: tests/io_task_test.cfg
+        params:
+            SOME_OPT: 12
+
+logging:
+    extra_logs: 1
+
diff --git a/nodes/discover.py b/nodes/discover.py
index b95d306..62da457 100644
--- a/nodes/discover.py
+++ b/nodes/discover.py
@@ -1,4 +1,5 @@
 import logging
+import urlparse
 
 import ceph
 import fuel
@@ -9,14 +10,11 @@
 logger = logging.getLogger("io-perf-tool")
 
 
-def discover(cluster_conf):
-    if not cluster_conf:
-        logger.error("No nodes configured")
-
+def discover(discover, clusters_info):
     nodes_to_run = []
-    for cluster, cluster_info in cluster_conf.items():
+    for cluster in discover:
         if cluster == "openstack":
-
+            cluster_info = clusters_info["openstack"]
             conn = cluster_info['connection']
             user, passwd, tenant = parse_creds(conn['creds'])
 
@@ -39,17 +37,31 @@
                                                           cluster_info)
             nodes_to_run.extend(os_nodes)
 
-        if cluster == "fuel":
-            url = cluster_info['connection'].pop('url')
-            creads = cluster_info['connection']
-            roles = cluster_info['discover']
+        elif cluster == "fuel" or cluster == "fuel+openstack":
+            cluster_info = clusters_info['fuel']
+            url = cluster_info['url']
+            creds = cluster_info['creds']
+            ssh_creds = cluster_info['ssh_creds']
+            # if user:password format us used
+            if not ssh_creds.startswith("ssh://"):
+                ip_port = urlparse.urlparse(url).netloc
 
-            if isinstance(roles, basestring):
-                roles = [roles]
+                if ':' in ip_port:
+                    ip = ip_port.split(":")[0]
+                else:
+                    ip = ip_port
 
-            nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
+                ssh_creds = "ssh://{0}@{1}".format(ssh_creds, ip)
 
-        if cluster == "ceph":
+            env = cluster_info['openstack_env']
+
+            nodes_to_run.extend(fuel.discover_fuel_nodes(url, creds, env))
+
+        elif cluster == "ceph":
+            cluster_info = clusters_info["ceph"]
             nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+        else:
+            msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
+            raise ValueError(msg_templ.format(cluster))
 
     return nodes_to_run
diff --git a/nodes/fuel.py b/nodes/fuel.py
index 25476dc..9b1312e 100644
--- a/nodes/fuel.py
+++ b/nodes/fuel.py
@@ -1,18 +1,18 @@
 import logging
 
 
-import node
+from node import Node
 import fuel_rest_api
-from disk_perf_test_tool.utils import parse_creds
 
 
 logger = logging.getLogger("io-perf-tool")
 
 
-def discover_fuel_nodes(root_url, credentials, roles):
+def discover_fuel_nodes(root_url, credentials, cluster_name):
     """Discover Fuel nodes"""
-    user, passwd, tenant = parse_creds(credentials['creds'])
-
+    assert credentials.count(':') >= 2
+    user, passwd_tenant = credentials.split(":", 1)
+    passwd, tenant = passwd_tenant.rsplit(":", 1)
     creds = dict(
         username=user,
         password=passwd,
@@ -21,8 +21,14 @@
 
     connection = fuel_rest_api.KeystoneAuth(root_url, creds)
     fi = fuel_rest_api.FuelInfo(connection)
+
+    clusters_id = fuel_rest_api.get_cluster_id(connection, cluster_name)
+
     nodes = []
-    for role in roles:
-        nodes.extend(getattr(fi.nodes, role))
-    logger.debug("Found %s fuel nodes" % len(fi.nodes))
-    return [node.Node(n.ip, n.get_roles()) for n in nodes]
+
+    for node in fi.nodes:
+        if node.cluster == clusters_id:
+            nodes.append(node)
+    res = [Node(n.ip, n.get_roles()) for n in nodes]
+    logger.debug("Found %s fuel nodes for env %r" % (len(res), cluster_name))
+    return res
diff --git a/nodes/openstack.py b/nodes/openstack.py
index ab41f62..d212283 100644
--- a/nodes/openstack.py
+++ b/nodes/openstack.py
@@ -63,9 +63,9 @@
     client = Client(version='1.1', **conn_details)
     nodes = []
     if conf.get('discover'):
-        vms_to_discover = conf['discover'].get('vm')
-        if vms_to_discover:
-            nodes.extend(discover_vms(client, vms_to_discover))
+        # vms_to_discover = conf['discover'].get('vm')
+        # if vms_to_discover:
+        #     nodes.extend(discover_vms(client, vms_to_discover))
         services_to_discover = conf['discover'].get('nodes')
         if services_to_discover:
             nodes.extend(discover_services(client, services_to_discover))
diff --git a/results/untime_variation_no_wa.txt b/results/untime_variation_no_wa.txt
new file mode 100644
index 0000000..d985cfd
--- /dev/null
+++ b/results/untime_variation_no_wa.txt
@@ -0,0 +1,121 @@
+{'__meta__': {'raw_cfg': '[writetest_10 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=10\ntime_based\nwait_for_previous\n\n[writetest_20 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=20\ntime_based\nwait_for_previous\n\n[writetest_30 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=30\ntime_based\nwait_for_previous\n\n[writetest_120 * 7]\nstartdelay=10\nnumjobs=1\nblocksize=4k\nfilename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin\nrw=randwrite\ndirect=1\nbuffered=0\niodepth=1\nsize=1Gb\nruntime=120\ntime_based\nwait_for_previous\n'},
+ 'res': {u'writetest_10': {'action': 'randwrite',
+                           'bw_mean': [1042.83,
+                                       940.67,
+                                       930.06,
+                                       894.17,
+                                       891.28,
+                                       915.72,
+                                       902.28],
+                           'clat': [4009.81,
+                                    4522.23,
+                                    4529.23,
+                                    4767.7,
+                                    4764.96,
+                                    4651.26,
+                                    4716.1],
+                           'concurence': 1,
+                           'direct_io': True,
+                           'iops': [249, 220, 220, 209, 209, 214, 211],
+                           'jobname': u'writetest_10',
+                           'lat': [4010.17,
+                                   4522.6,
+                                   4529.58,
+                                   4768.08,
+                                   4765.32,
+                                   4651.62,
+                                   4716.46],
+                           'size': '1Gb',
+                           'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+                           'sync': False,
+                           'timings': ('10', '10')},
+         u'writetest_120': {'action': 'randwrite',
+                            'bw_mean': [772.65,
+                                        862.57,
+                                        874.52,
+                                        877.36,
+                                        815.79,
+                                        746.11,
+                                        805.6],
+                            'clat': [5637.01,
+                                     5054.7,
+                                     4973.0,
+                                     4989.0,
+                                     5334.4,
+                                     5826.8,
+                                     5408.51],
+                            'concurence': 1,
+                            'direct_io': True,
+                            'iops': [177, 197, 200, 200, 187, 171, 184],
+                            'jobname': u'writetest_120',
+                            'lat': [5637.37,
+                                    5055.07,
+                                    4973.37,
+                                    4989.36,
+                                    5334.79,
+                                    5827.16,
+                                    5408.87],
+                            'size': '1Gb',
+                            'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+                            'sync': False,
+                            'timings': ('120', '10')},
+         u'writetest_20': {'action': 'randwrite',
+                           'bw_mean': [908.22,
+                                       898.28,
+                                       888.81,
+                                       904.74,
+                                       895.08,
+                                       894.42,
+                                       890.42],
+                           'clat': [4738.29,
+                                    4762.78,
+                                    4826.89,
+                                    4822.72,
+                                    4847.32,
+                                    4785.83,
+                                    4849.56],
+                           'concurence': 1,
+                           'direct_io': True,
+                           'iops': [210, 209, 206, 207, 206, 208, 205],
+                           'jobname': u'writetest_20',
+                           'lat': [4738.68,
+                                   4763.16,
+                                   4827.26,
+                                   4823.09,
+                                   4847.67,
+                                   4786.18,
+                                   4849.95],
+                           'size': '1Gb',
+                           'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+                           'sync': False,
+                           'timings': ('20', '10')},
+         u'writetest_30': {'action': 'randwrite',
+                           'bw_mean': [880.89,
+                                       878.45,
+                                       868.5,
+                                       854.61,
+                                       777.02,
+                                       748.49,
+                                       729.09],
+                           'clat': [4874.05,
+                                    4927.75,
+                                    4979.63,
+                                    5080.85,
+                                    5558.3,
+                                    5753.93,
+                                    5925.94],
+                           'concurence': 1,
+                           'direct_io': True,
+                           'iops': [204, 202, 200, 196, 179, 173, 168],
+                           'jobname': u'writetest_30',
+                           'lat': [4874.44,
+                                   4928.12,
+                                   4980.01,
+                                   5081.22,
+                                   5558.68,
+                                   5754.29,
+                                   5926.29],
+                           'size': '1Gb',
+                           'slat': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
+                           'sync': False,
+                           'timings': ('30', '10')}}}
diff --git a/run_test.py b/run_test.py
index d1a9f4f..992c7eb 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,4 +1,5 @@
 import sys
+import json
 import Queue
 import pprint
 import logging
@@ -10,10 +11,11 @@
 
 import utils
 import ssh_utils
+import start_vms
 from nodes import discover
 from nodes.node import Node
 from config import cfg_dict
-from itest import IOPerfTest, PgBenchTest
+from tests.itest import IOPerfTest, PgBenchTest
 
 from sensors.api import start_monitoring
 
@@ -57,15 +59,19 @@
     logger.info("Connecting to nodes")
     with ThreadPoolExecutor(32) as pool:
         list(pool.map(connect_one, nodes))
+    logger.info("All nodes connected successfully")
 
 
 def save_sensors_data(q):
     logger.info("Start receiving sensors data")
+    sensor_data = []
     while True:
         val = q.get()
         if val is None:
+            print sensor_data
+            q.put(sensor_data)
             break
-        # logger.debug("Sensors -> {0!r}".format(val))
+        sensor_data.append(val)
     logger.info("Sensors thread exits")
 
 
@@ -90,26 +96,30 @@
 
     res_q = Queue.Queue()
 
-    for name, params in config['tests'].items():
-        logger.info("Starting {0} tests".format(name))
+    for test in config['tests']:
+        for name, params in test.items():
+            logger.info("Starting {0} tests".format(name))
 
-        threads = []
-        barrier = utils.Barrier(len(test_nodes))
-        for node in test_nodes:
-            msg = "Starting {0} test on {1} node"
-            logger.debug(msg.format(name, node.conn_url))
-            test = tool_type_mapper[name](params, res_q.put)
-            th = threading.Thread(None, test_thread, None,
-                                  (test, node, barrier))
-            threads.append(th)
-            th.daemon = True
-            th.start()
+            threads = []
+            barrier = utils.Barrier(len(test_nodes))
+            for node in test_nodes:
+                msg = "Starting {0} test on {1} node"
+                logger.debug(msg.format(name, node.conn_url))
+                test = tool_type_mapper[name](params, res_q.put)
+                th = threading.Thread(None, test_thread, None,
+                                      (test, node, barrier))
+                threads.append(th)
+                th.daemon = True
+                th.start()
 
-        for th in threads:
-            th.join()
+            for th in threads:
+                th.join()
 
-        while not res_q.empty():
-            logger.info("Get test result {0!r}".format(res_q.get()))
+            results = []
+            while not res_q.empty():
+                results.append(res_q.get())
+                # logger.info("Get test result {0!r}".format(results[-1]))
+            yield name, results
 
 
 def parse_args(argv):
@@ -120,14 +130,14 @@
                         action='store_true', default=False,
                         help="print some extra log info")
 
-    parser.add_argument('stages', nargs="+",
-                        choices=["discover", "connect", "start_new_nodes",
-                                 "deploy_sensors", "run_tests"])
+    parser.add_argument("-o", '--output-dest', nargs="*")
+    parser.add_argument("config_file", nargs="?", default="config.yaml")
 
     return parser.parse_args(argv[1:])
 
 
-def log_nodes_statistic(nodes):
+def log_nodes_statistic(_, ctx):
+    nodes = ctx.nodes
     logger.info("Found {0} nodes total".format(len(nodes)))
     per_role = collections.defaultdict(lambda: 0)
     for node in nodes:
@@ -142,65 +152,132 @@
     pass
 
 
+def connect_stage(cfg, ctx):
+    ctx.clear_calls_stack.append(disconnect_stage)
+    connect_all(ctx.nodes)
+
+
+def discover_stage(cfg, ctx):
+    if 'discover' in cfg:
+        discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+        ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
+
+    for url, roles in cfg.get('explicit_nodes', {}).items():
+        ctx.nodes.append(Node(url, roles.split(",")))
+
+
+def deploy_sensors_stage(cfg_dict, ctx):
+    ctx.clear_calls_stack.append(remove_sensors_stage)
+    if 'sensors' not in cfg_dict:
+        return
+
+    cfg = cfg_dict.get('sensors')
+    sens_cfg = []
+
+    for role, sensors_str in cfg["roles_mapping"].items():
+        sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+        collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+        for node in ctx.nodes:
+            if role in node.roles:
+                sens_cfg.append((node.connection, collect_cfg))
+
+    log_sensors_config(sens_cfg)
+
+    ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+                                     connected_config=sens_cfg)
+
+    ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+
+    th = threading.Thread(None, save_sensors_data, None,
+                          (ctx.sensors_control_queue,))
+    th.daemon = True
+    th.start()
+    ctx.sensor_listen_thread = th
+
+
+def remove_sensors_stage(cfg, ctx):
+    ctx.sensors_control_queue.put(None)
+    ctx.sensor_listen_thread.join()
+    ctx.sensor_data = ctx.sensors_control_queue.get()
+
+
+def run_tests_stage(cfg, ctx):
+    ctx.results = []
+
+    if 'tests' in cfg:
+        ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
+
+    # if 'start_test_nodes' in opts.stages:
+    #     params = cfg_dict['start_test_nodes']['openstack']
+    #     for new_node in start_vms.launch_vms(params):
+    #         new_node.roles.append('testnode')
+    #         nodes.append(new_node)
+
+
+def disconnect_stage(cfg, ctx):
+    for node in ctx.nodes:
+        if node.connection is not None:
+            node.connection.close()
+
+
+def report_stage(cfg, ctx):
+    output_dest = cfg.get('output_dest')
+    if output_dest is not None:
+        with open(output_dest, "w") as fd:
+            data = {"sensor_data": ctx.sensor_data,
+                    "results": ctx.results}
+            fd.write(json.dumps(data))
+    else:
+        print "=" * 20 + " RESULTS " + "=" * 20
+        pprint.pprint(ctx.results)
+        print "=" * 60
+
+
+def complete_log_nodes_statistic(cfg, ctx):
+    nodes = ctx.nodes
+    for node in nodes:
+        logger.debug(str(node))
+
+
+class Context(object):
+    def __init__(self):
+        self.nodes = []
+        self.clear_calls_stack = []
+
+
 def main(argv):
     opts = parse_args(argv)
 
     level = logging.DEBUG if opts.extra_logs else logging.WARNING
     setup_logger(logger, level)
 
-    nodes = []
+    stages = [
+        discover_stage,
+        connect_stage,
+        complete_log_nodes_statistic,
+        # deploy_sensors_stage,
+        run_tests_stage,
+        report_stage
+    ]
 
-    if 'discover' in opts.stages:
-        logger.info("Start node discovery")
-        nodes = discover.discover(cfg_dict.get('discover'))
+    ctx = Context()
+    try:
+        for stage in stages:
+            logger.info("Start {0.__name__} stage".format(stage))
+            stage(cfg_dict, ctx)
+    finally:
+        exc, cls, tb = sys.exc_info()
+        for stage in ctx.clear_calls_stack[::-1]:
+            try:
+                logger.info("Start {0.__name__} stage".format(stage))
+                stage(cfg_dict, ctx)
+            except:
+                pass
 
-    if 'explicit_nodes' in cfg_dict:
-        for url, roles in cfg_dict['explicit_nodes'].items():
-            nodes.append(Node(url, roles.split(",")))
-
-    log_nodes_statistic(nodes)
-
-    if 'connect' in opts.stages:
-        connect_all(nodes)
-
-    if 'deploy_sensors' in opts.stages:
-        logger.info("Deploing sensors")
-        cfg = cfg_dict.get('sensors')
-        sens_cfg = []
-
-        for role, sensors_str in cfg["roles_mapping"].items():
-            sensors = [sens.strip() for sens in sensors_str.split(",")]
-
-            collect_cfg = dict((sensor, {}) for sensor in sensors)
-
-            for node in nodes:
-                if role in node.roles:
-                    sens_cfg.append((node.connection, collect_cfg))
-
-        log_sensors_config(sens_cfg)
-
-        sensor_cm = start_monitoring(cfg["receiver_uri"], None,
-                                     connected_config=sens_cfg)
-
-        with sensor_cm as sensors_control_queue:
-            th = threading.Thread(None, save_sensors_data, None,
-                                  (sensors_control_queue,))
-            th.daemon = True
-            th.start()
-
-            # TODO: wait till all nodes start to send sensors data
-
-            if 'run_tests' in opts.stages:
-                run_tests(cfg_dict, nodes)
-
-            sensors_control_queue.put(None)
-            th.join()
-    elif 'run_tests' in opts.stages:
-        run_tests(cfg_dict, nodes)
-
-    logger.info("Disconnecting")
-    for node in nodes:
-        node.connection.close()
+        if exc is not None:
+            raise exc, cls, tb
 
     return 0
 
diff --git a/scripts/agent.py b/scripts/agent.py
index 56189f9..2873cb5 100644
--- a/scripts/agent.py
+++ b/scripts/agent.py
@@ -22,10 +22,10 @@
 
         out, err = p.communicate()
 
-        if not out is None:
+        if out is not None:
             print out
 
-        if not err is None:
+        if err is not None:
             print err
 
         node_port[ip] = base_port
@@ -35,18 +35,12 @@
 
 
 def parse_command_line(argv):
-    parser = argparse.ArgumentParser(description=
-                                     "Connect to fuel master "
+    parser = argparse.ArgumentParser(description="Connect to fuel master " +
                                      "and setup ssh agent")
-    parser.add_argument(
-        "--base_port", type=int, required=True)
-
+    parser.add_argument("--base_port", type=int, required=True)
     # To do: fix clean to be False when string is False
-    parser.add_argument(
-        "--clean", type=bool, default=False)
-
-    parser.add_argument(
-        "--ports", type=str, nargs='+')
+    parser.add_argument("--clean", type=bool, default=False)
+    parser.add_argument("--ports", type=str, nargs='+')
 
     return parser.parse_args(argv)
 
diff --git a/scripts/connector.py b/scripts/connector.py
index 4d9d32f..15bc538 100644
--- a/scripts/connector.py
+++ b/scripts/connector.py
@@ -6,7 +6,7 @@
 from urlparse import urlparse
 
 
-from keystone import KeystoneAuth
+from disk_perf_test_tool.keystone import KeystoneAuth
 
 
 def discover_fuel_nodes(fuel_url, creds, cluster_id):
@@ -92,17 +92,11 @@
 def parse_command_line(argv):
     parser = argparse.ArgumentParser(
         description="Connect to fuel master and setup ssh agent")
-    parser.add_argument(
-        "--fuel_url", required=True)
-    parser.add_argument(
-        "--cluster_id", required=True)
-    parser.add_argument(
-        "--username", default="admin")
-    parser.add_argument(
-        "--tenantname", default="admin")
-    parser.add_argument(
-        "--password", default="admin")
-
+    parser.add_argument("--fuel-url", required=True)
+    parser.add_argument("--cluster-id", required=True)
+    parser.add_argument("--username", default="admin")
+    parser.add_argument("--tenantname", default="admin")
+    parser.add_argument("--password", default="admin")
     return parser.parse_args(argv)
 
 
@@ -113,6 +107,8 @@
              "password": args.password}
 
     nodes = discover_fuel_nodes(args.fuel_url, creds, args.cluster_id)
+    print "Ready", nodes
+    sys.stdin.readline()
     discover_fuel_nodes_clean(args.fuel_url, {"username": "root",
                                               "password": "test37",
                                               "port": 22}, nodes)
diff --git a/scripts/data2.py b/scripts/data2.py
index 08dbc77..4a8dad9 100644
--- a/scripts/data2.py
+++ b/scripts/data2.py
@@ -1,6 +1,6 @@
 import sys
-import math
-import itertools
+from data_stat import med_dev, round_deviation, groupby_globally
+from data_stat import read_data_agent_result
 
 
 def key(x):
@@ -10,36 +10,6 @@
             x['__meta__']['concurence'])
 
 
-def med_dev(vals):
-    med = sum(vals) / len(vals)
-    dev = ((sum(abs(med - i) ** 2 for i in vals) / len(vals)) ** 0.5)
-    return int(med), int(dev)
-
-
-def round_deviation(med_dev):
-    med, dev = med_dev
-
-    if dev < 1E-7:
-        return med_dev
-
-    dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
-    dev = int(dev / dev_div) * dev_div
-    med = int(med / dev_div) * dev_div
-    return (type(med_dev[0])(med),
-            type(med_dev[1])(dev))
-
-
-def groupby_globally(data, key_func):
-    grouped = {}
-    grouped_iter = itertools.groupby(data, key_func)
-
-    for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
-        key = (bs, cache_tp, act, conc)
-        grouped.setdefault(key, []).extend(curr_data_it)
-
-    return grouped
-
-
 template = "{bs:>4}  {action:>12}  {cache_tp:>3}  {conc:>4}"
 template += " | {iops[0]:>6} ~ {iops[1]:>5} | {bw[0]:>7} ~ {bw[1]:>6}"
 template += " | {lat[0]:>6} ~ {lat[1]:>5} |"
@@ -54,21 +24,7 @@
 
 
 def main(argv):
-    data = []
-
-    with open(argv[1]) as fc:
-        block = None
-        for line in fc:
-            if line.startswith("{'__meta__':"):
-                block = line
-            elif block is not None:
-                block += line
-
-            if block is not None:
-                if block.count('}') == block.count('{'):
-                    data.append(eval(block))
-                    block = None
-
+    data = read_data_agent_result(sys.argv[1])
     grouped = groupby_globally(data, key)
 
     print template.format(**headers)
diff --git a/scripts/data_stat.py b/scripts/data_stat.py
new file mode 100644
index 0000000..a4d46bf
--- /dev/null
+++ b/scripts/data_stat.py
@@ -0,0 +1,49 @@
+import math
+import itertools
+
+
+def med_dev(vals):
+    med = sum(vals) / len(vals)
+    dev = ((sum(abs(med - i) ** 2 for i in vals) / len(vals)) ** 0.5)
+    return int(med), int(dev)
+
+
+def round_deviation(med_dev):
+    med, dev = med_dev
+
+    if dev < 1E-7:
+        return med_dev
+
+    dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
+    dev = int(dev / dev_div) * dev_div
+    med = int(med / dev_div) * dev_div
+    return (type(med_dev[0])(med),
+            type(med_dev[1])(dev))
+
+
+def groupby_globally(data, key_func):
+    grouped = {}
+    grouped_iter = itertools.groupby(data, key_func)
+
+    for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
+        key = (bs, cache_tp, act, conc)
+        grouped.setdefault(key, []).extend(curr_data_it)
+
+    return grouped
+
+
+def read_data_agent_result(fname):
+    data = []
+    with open(fname) as fc:
+        block = None
+        for line in fc:
+            if line.startswith("{'__meta__':"):
+                block = line
+            elif block is not None:
+                block += line
+
+            if block is not None:
+                if block.count('}') == block.count('{'):
+                    data.append(eval(block))
+                    block = None
+    return data
diff --git a/scripts/disk_io_pp.py b/scripts/disk_io_pp.py
new file mode 100644
index 0000000..bf901d3
--- /dev/null
+++ b/scripts/disk_io_pp.py
@@ -0,0 +1,60 @@
+import sys
+import collections
+
+import scipy.stats as stats
+import matplotlib.mlab as mlab
+import matplotlib.pyplot as plt
+
+from data_stat import med_dev, round_deviation
+from data_stat import read_data_agent_result
+
+data = read_data_agent_result(sys.argv[1])
+
+# for run in data:
+#     for name, numbers in run['res'].items():
+#         # med, dev = round_deviation(med_dev(numbers['iops']))
+#         # print name, med, '~', dev
+#         distr = collections.defaultdict(lambda: 0.0)
+#         for i in numbers['iops']:
+#             distr[i] += 1
+
+#         print name
+#         for key, val in sorted(distr.items()):
+#             print "    ", key, val
+#         print
+
+
+
+# # example data
+# mu = 100 # mean of distribution
+# sigma = 15 # standard deviation of distribution
+# x = mu + sigma * np.random.randn(10000)
+
+x = data[0]['res'][sys.argv[2]]['iops']
+# mu, sigma = med_dev(x)
+# print mu, sigma
+
+# med_sz = 1
+# x2 = x[:len(x) // med_sz * med_sz]
+# x2 = [sum(vals) / len(vals) for vals in zip(*[x2[i::med_sz]
+#                                               for i in range(med_sz)])]
+
+mu, sigma = med_dev(x)
+print mu, sigma
+print stats.normaltest(x)
+
+num_bins = 20
+# the histogram of the data
+n, bins, patches = plt.hist(x, num_bins, normed=1, facecolor='green', alpha=0.5)
+# add a 'best fit' line
+
+y = mlab.normpdf(bins, mu, sigma)
+plt.plot(bins, y, 'r--')
+
+plt.xlabel('Smarts')
+plt.ylabel('Probability')
+plt.title(r'Histogram of IQ: $\mu={}$, $\sigma={}$'.format(mu, sigma))
+
+# Tweak spacing to prevent clipping of ylabel
+plt.subplots_adjust(left=0.15)
+plt.show()
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index e0428d9..3c715ef 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -88,6 +88,8 @@
     if isinstance(uri_or_conn, basestring):
         conn.close()
 
+    logger.debug("Sensors stopped and removed")
+
 
 def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
                             connected_config=None):
diff --git a/start_vms.py b/start_vms.py
index 16e7d08..81f3e81 100644
--- a/start_vms.py
+++ b/start_vms.py
@@ -8,6 +8,8 @@
 from novaclient.client import Client as n_client
 from cinderclient.v1.client import Client as c_client
 
+from nodes.node import Node
+from nodes.openstack import get_floating_ip
 
 logger = logging.getLogger("io-perf-tool")
 
@@ -81,6 +83,28 @@
     return [ip for ip in ip_list if ip.instance_id is None][:amount]
 
 
+def launch_vms(config):
+    creds = config['creds']
+    if creds != 'ENV':
+        raise ValueError("Only 'ENV' creds are supported")
+
+    logger.debug("Starting new nodes on openstack")
+    conn = nova_connect()
+    params = config['vm_params'].copy()
+    count = params.pop('count')
+
+    if isinstance(count, basestring):
+        assert count.startswith("x")
+        lst = conn.services.list(binary='nova-compute')
+        srv_count = len([srv for srv in lst if srv.status == 'enabled'])
+        count = srv_count * int(count[1:])
+
+    creds = params.pop('creds')
+
+    for ip, _ in create_vms_mt(conn, count, **params):
+        yield Node(creds.format(ip), [])
+
+
 def create_vms_mt(nova, amount, keypair_name, img_name,
                   flavor_name, vol_sz=None, network_zone_name=None,
                   flt_ip_pool=None, name_templ='ceph-test-{0}',
@@ -99,10 +123,6 @@
         if flt_ip_pool is not None:
             ips_future = executor.submit(get_floating_ips,
                                          nova, flt_ip_pool, amount)
-        else:
-            ips_future = None
-
-        if ips_future is not None:
             logger.debug("Wait for floating ip")
             ips = ips_future.result()
             ips += [Allocate] * (amount - len(ips))
@@ -169,11 +189,12 @@
 
     if flt_ip is Allocate:
         flt_ip = nova.floating_ips.create(pool)
+
     if flt_ip is not None:
         # print "attaching ip to server"
         srv.add_floating_ip(flt_ip)
 
-    return nova.servers.get(srv.id)
+    return flt_ip.ip, nova.servers.get(srv.id)
 
 
 def clear_all(nova, name_templ="ceph-test-{0}"):
diff --git a/storage_api.py b/storage_api.py
index 1dbf30c..d02d0cd 100644
--- a/storage_api.py
+++ b/storage_api.py
@@ -1,5 +1,3 @@
-from urlparse import urlparse
-
 import json
 import math
 from config import TEST_PATH
diff --git a/tests.yaml b/tests.yaml
new file mode 100644
index 0000000..a90c628
--- /dev/null
+++ b/tests.yaml
@@ -0,0 +1,26 @@
+- with_test_nodes:
+    openstack:
+        creds: ENV
+        # creds: FUEL://USER:PASSDW@172.16.52.112:8000/ENV_NAME
+        vm_params:
+            count: x1
+            img_name: disk_io_perf
+            flavor_name: disk_io_perf.256
+            keypair_name: disk_io_perf
+            network_zone_name: novanetwork
+            flt_ip_pool: nova
+            creds: "ssh://ubuntu@{0}::disk_io_perf.pem"
+    tests:
+        - pgbench:
+            opts:
+                num_clients: [4, 8, 12]
+                transactions: [1, 2, 3]
+        - io: 
+            tool: fio
+            config_file: tests/io_task_test.cfg
+
+        - vm_count: 
+            max_lat_ms: 20
+            min_bw_mbps: 60
+            min_4k_direct_w_iops: 100
+            min_4k_direct_r_iops: 100
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
new file mode 100644
index 0000000..7537438
--- /dev/null
+++ b/tests/disk_test_agent.py
@@ -0,0 +1,457 @@
+import sys
+import time
+import json
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+
+
+SECTION = 0
+SETTING = 1
+
+
+def get_test_summary(params):
+    rw = {"randread": "rr",
+          "randwrite": "rw",
+          "read": "sr",
+          "write": "sw"}[params["rw"]]
+
+    if params.get("direct") == '1':
+        sync_mode = 'd'
+    elif params.get("sync") == '1':
+        sync_mode = 's'
+    else:
+        sync_mode = 'a'
+
+    th_count = int(params.get('numjobs', '1'))
+
+    return "{0}{1}{2}th{3}".format(rw, sync_mode,
+                                   params['blocksize'], th_count)
+
+
+counter = [0]
+
+
+def process_section(name, vals, defaults, format_params):
+    vals = vals.copy()
+    params = format_params.copy()
+
+    if '*' in name:
+        name, repeat = name.split('*')
+        name = name.strip()
+        repeat = int(repeat.format(**params))
+    else:
+        repeat = 1
+
+    # this code can be optimized
+    for i in range(repeat):
+        iterable_names = []
+        iterable_values = []
+        processed_vals = {}
+
+        for val_name, val in vals.items():
+            if val is None:
+                processed_vals[val_name] = val
+            # remove hardcode
+            elif val.startswith('{%'):
+                assert val.endswith("%}")
+                content = val[2:-2].format(**params)
+                iterable_names.append(val_name)
+                iterable_values.append(i.strip() for i in content.split(','))
+            else:
+                processed_vals[val_name] = val.format(**params)
+
+        if iterable_values == []:
+            params['UNIQ'] = 'UN{0}'.format(counter[0])
+            counter[0] += 1
+            params['TEST_SUMM'] = get_test_summary(processed_vals)
+            yield name.format(**params), processed_vals
+        else:
+            for it_vals in itertools.product(*iterable_values):
+                processed_vals.update(dict(zip(iterable_names, it_vals)))
+                params['UNIQ'] = 'UN{0}'.format(counter[0])
+                counter[0] += 1
+                params['TEST_SUMM'] = get_test_summary(processed_vals)
+                yield name.format(**params), processed_vals
+
+
+def calculate_execution_time(combinations):
+    time = 0
+    for _, params in combinations:
+        time += int(params.get('ramp_time', 0))
+        time += int(params.get('runtime', 0))
+    return time
+
+
+def parse_fio_config_full(fio_cfg, params=None):
+    defaults = {}
+    format_params = {}
+
+    if params is None:
+        ext_params = {}
+    else:
+        ext_params = params.copy()
+
+    curr_section = None
+    curr_section_name = None
+
+    for tp, name, val in parse_fio_config_iter(fio_cfg):
+        if tp == SECTION:
+            non_def = curr_section_name != 'defaults'
+            if curr_section_name is not None and non_def:
+                format_params.update(ext_params)
+                for sec in process_section(curr_section_name,
+                                           curr_section,
+                                           defaults,
+                                           format_params):
+                    yield sec
+
+            if name == 'defaults':
+                curr_section = defaults
+            else:
+                curr_section = OrderedDict()
+                curr_section.update(defaults)
+            curr_section_name = name
+
+        else:
+            assert tp == SETTING
+            assert curr_section_name is not None, "no section name"
+            if name == name.upper():
+                assert curr_section_name == 'defaults'
+                format_params[name] = val
+            else:
+                curr_section[name] = val
+
+    if curr_section_name is not None and curr_section_name != 'defaults':
+        format_params.update(ext_params)
+        for sec in process_section(curr_section_name,
+                                   curr_section,
+                                   defaults,
+                                   format_params):
+            yield sec
+
+
+def parse_fio_config_iter(fio_cfg):
+    for lineno, line in enumerate(fio_cfg.split("\n")):
+        try:
+            line = line.strip()
+
+            if line.startswith("#") or line.startswith(";"):
+                continue
+
+            if line == "":
+                continue
+
+            if line.startswith('['):
+                assert line.endswith(']'), "name should ends with ]"
+                yield SECTION, line[1:-1], None
+            elif '=' in line:
+                opt_name, opt_val = line.split('=', 1)
+                yield SETTING, opt_name.strip(), opt_val.strip()
+            else:
+                yield SETTING, line, None
+        except Exception as exc:
+            pref = "During parsing line number {0}\n".format(lineno)
+            raise ValueError(pref + exc.message)
+
+
+def format_fio_config(fio_cfg):
+    res = ""
+    for pos, (name, section) in enumerate(fio_cfg):
+        if pos != 0:
+            res += "\n"
+
+        res += "[{0}]\n".format(name)
+        for opt_name, opt_val in section.items():
+            if opt_val is None:
+                res += opt_name + "\n"
+            else:
+                res += "{0}={1}\n".format(opt_name, opt_val)
+    return res
+
+
+def do_run_fio(bconf):
+    benchmark_config = format_fio_config(bconf)
+    cmd = ["fio", "--output-format=json", "-"]
+    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+
+    # set timeout
+    raw_out, _ = p.communicate(benchmark_config)
+
+    try:
+        parsed_out = json.loads(raw_out)["jobs"]
+    except Exception:
+        msg = "Can't parse fio output: {0!r}\nError: {1}"
+        raise ValueError(msg.format(raw_out, traceback.format_exc()))
+
+    return zip(parsed_out, bconf)
+
+
+# limited by fio
+MAX_JOBS = 1000
+
+
+def next_test_portion(whole_conf, runcycle):
+    jcount = 0
+    runtime = 0
+    bconf = []
+
+    for pos, (name, sec) in enumerate(whole_conf):
+        jc = int(sec.get('numjobs', '1'))
+
+        if runcycle is not None:
+            curr_task_time = calculate_execution_time([(name, sec)])
+        else:
+            curr_task_time = 0
+
+        if jc > MAX_JOBS:
+            err_templ = "Can't process job {0!r} - too large numjobs"
+            raise ValueError(err_templ.format(name))
+
+        if runcycle is not None and len(bconf) != 0:
+            rc_ok = curr_task_time + runtime <= runcycle
+        else:
+            rc_ok = True
+
+        if jc + jcount <= MAX_JOBS and rc_ok:
+            runtime += curr_task_time
+            jcount += jc
+            bconf.append((name, sec))
+            continue
+
+        assert len(bconf) != 0
+        yield bconf
+
+        runtime = curr_task_time
+        jcount = jc
+        bconf = [(name, sec)]
+
+    if bconf != []:
+        yield bconf
+
+
+def add_job_results(jname, job_output, jconfig, res):
+    if job_output['write']['iops'] != 0:
+        raw_result = job_output['write']
+    else:
+        raw_result = job_output['read']
+
+    if jname not in res:
+        j_res = {}
+        j_res["action"] = jconfig["rw"]
+        j_res["direct_io"] = jconfig.get("direct", "0") == "1"
+        j_res["sync"] = jconfig.get("sync", "0") == "1"
+        j_res["concurence"] = int(jconfig.get("numjobs", 1))
+        j_res["size"] = jconfig["size"]
+        j_res["jobname"] = job_output["jobname"]
+        j_res["timings"] = (jconfig.get("runtime"),
+                            jconfig.get("ramp_time"))
+    else:
+        j_res = res[jname]
+        assert j_res["action"] == jconfig["rw"]
+
+        assert j_res["direct_io"] == \
+            (jconfig.get("direct", "0") == "1")
+
+        assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
+        assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+        assert j_res["size"] == jconfig["size"]
+        assert j_res["jobname"] == job_output["jobname"]
+        assert j_res["timings"] == (jconfig.get("runtime"),
+                                    jconfig.get("ramp_time"))
+
+    def j_app(name, x):
+        j_res.setdefault(name, []).append(x)
+
+    # 'bw_dev bw_mean bw_max bw_min'.split()
+    j_app("bw_mean", raw_result["bw_mean"])
+    j_app("iops", raw_result["iops"])
+    j_app("lat", raw_result["lat"]["mean"])
+    j_app("clat", raw_result["clat"]["mean"])
+    j_app("slat", raw_result["slat"]["mean"])
+
+    res[jname] = j_res
+
+
+def run_fio(benchmark_config,
+            params,
+            runcycle=None,
+            raw_results_func=None,
+            skip_tests=0):
+
+    whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip_tests:]
+    res = {}
+    curr_test_num = skip_tests
+    execited_tests = 0
+    try:
+        for bconf in next_test_portion(whole_conf, runcycle):
+            res_cfg_it = do_run_fio(bconf)
+            res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+
+            for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+                execited_tests += 1
+                if raw_results_func is not None:
+                    raw_results_func(curr_test_num,
+                                     [job_output, jname, jconfig])
+
+                assert jname == job_output["jobname"]
+
+                if jname.startswith('_'):
+                    continue
+
+                add_job_results(jname, job_output, jconfig, res)
+
+    except (SystemExit, KeyboardInterrupt):
+        pass
+
+    except Exception:
+        traceback.print_exc()
+
+    return res, execited_tests
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+    if 'fio' == binary_tp:
+        return run_fio(*argv, **kwargs)
+    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument("--type", metavar="BINARY_TYPE",
+                        choices=['fio'], default='fio',
+                        help=argparse.SUPPRESS)
+    parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+                        help="Start execution at START_AT_UTC")
+    parser.add_argument("--json", action="store_true", default=False,
+                        help="Json output format")
+    parser.add_argument("--output", default='-', metavar="FILE_PATH",
+                        help="Store results to FILE_PATH")
+    parser.add_argument("--estimate", action="store_true", default=False,
+                        help="Only estimate task execution time")
+    parser.add_argument("--compile", action="store_true", default=False,
+                        help="Compile config file to fio config")
+    parser.add_argument("--num-tests", action="store_true", default=False,
+                        help="Show total number of tests")
+    parser.add_argument("--runcycle", type=int, default=None,
+                        metavar="MAX_CYCLE_SECONDS",
+                        help="Max cycle length in seconds")
+    parser.add_argument("--show-raw-results", action='store_true',
+                        default=False, help="Output raw input and results")
+    parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+                        help="Skip NUM tests")
+    parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+                        default=[],
+                        help="Provide set of pairs PARAM=VAL to" +
+                             "format into job description")
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def read_config(fd, timeout=10):
+    job_cfg = ""
+    etime = time.time() + timeout
+    while True:
+        wtime = etime - time.time()
+        if wtime <= 0:
+            raise IOError("No config provided")
+
+        r, w, x = select.select([fd], [], [], wtime)
+        if len(r) == 0:
+            raise IOError("No config provided")
+
+        char = fd.read(1)
+        if '' == char:
+            return job_cfg
+
+        job_cfg += char
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+
+    if argv_obj.jobfile == '-':
+        job_cfg = read_config(sys.stdin)
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    if argv_obj.output == '-':
+        out_fd = sys.stdout
+    else:
+        out_fd = open(argv_obj.output, "w")
+
+    params = {}
+    for param_val in argv_obj.params:
+        assert '=' in param_val
+        name, val = param_val.split("=", 1)
+        params[name] = val
+
+    if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
+        bconf = list(parse_fio_config_full(job_cfg, params))
+        bconf = bconf[argv_obj.skip_tests:]
+
+        if argv_obj.compile:
+            out_fd.write(format_fio_config(bconf))
+            out_fd.write("\n")
+
+        if argv_obj.num_tests:
+            print len(bconf)
+
+        if argv_obj.estimate:
+            seconds = calculate_execution_time(bconf)
+
+            h = seconds // 3600
+            m = (seconds % 3600) // 60
+            s = seconds % 60
+
+            print "{0}:{1}:{2}".format(h, m, s)
+        return 0
+
+    if argv_obj.start_at is not None:
+        ctime = time.time()
+        if argv_obj.start_at >= ctime:
+            time.sleep(ctime - argv_obj.start_at)
+
+    def raw_res_func(test_num, data):
+        pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+        out_fd.write(pref)
+        out_fd.write(json.dumps(data))
+        out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+        out_fd.flush()
+
+    rrfunc = raw_res_func if argv_obj.show_raw_results else None
+
+    stime = time.time()
+    job_res, num_tests = run_benchmark(argv_obj.type,
+                                       job_cfg,
+                                       params,
+                                       argv_obj.runcycle,
+                                       rrfunc,
+                                       argv_obj.skip_tests)
+    etime = time.time()
+
+    res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
+
+    oformat = 'json' if argv_obj.json else 'eval'
+    out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
+                                                         int(etime - stime)))
+    out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+    if argv_obj.json:
+        out_fd.write(json.dumps(res))
+    else:
+        out_fd.write(pprint.pformat(res) + "\n")
+    out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
+
+    return 0
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv[1:]))
diff --git a/tests/fio_configs/1.cfg b/tests/fio_configs/1.cfg
new file mode 100644
index 0000000..d5240cd
--- /dev/null
+++ b/tests/fio_configs/1.cfg
@@ -0,0 +1,100 @@
+[writetest_10 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=10
+time_based
+wait_for_previous
+
+[writetest_20 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=20
+time_based
+wait_for_previous
+
+[writetest_30 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_120 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=120
+time_based
+wait_for_previous
+
+[writetest_30_5 * 55]
+ramp_time=5
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_30_10 * 55]
+ramp_time=10
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_30_15 * 55]
+ramp_time=15
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
diff --git a/tests/fio_configs/2.cfg b/tests/fio_configs/2.cfg
new file mode 100644
index 0000000..050e477
--- /dev/null
+++ b/tests/fio_configs/2.cfg
@@ -0,0 +1,13 @@
+[writetest_10_20 * 3]
+ramp_time=5
+numjobs=1
+blocksize=4k
+filename={FILENAME}
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=5
+time_based
+wait_for_previous
diff --git a/tests/io.py b/tests/io.py
deleted file mode 100644
index 2405f53..0000000
--- a/tests/io.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import sys
-import time
-import json
-import select
-import pprint
-import argparse
-import subprocess
-from StringIO import StringIO
-from ConfigParser import RawConfigParser
-
-
-def run_fio(benchmark_config):
-    cmd = ["fio", "--output-format=json", "-"]
-    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
-    raw_out, _ = p.communicate(benchmark_config)
-    job_output = json.loads(raw_out)["jobs"][0]
-
-    if job_output['write']['iops'] != 0:
-        raw_result = job_output['write']
-    else:
-        raw_result = job_output['read']
-
-    res = {}
-
-    # 'bw_dev bw_mean bw_max bw_min'.split()
-    for field in ["bw_mean", "iops"]:
-        res[field] = raw_result[field]
-
-    res["lat"] = raw_result["lat"]["mean"]
-    res["clat"] = raw_result["clat"]["mean"]
-    res["slat"] = raw_result["slat"]["mean"]
-    res["util"] = json.loads(raw_out)["disk_util"][0]
-
-    res["util"] = dict((str(k), v) for k, v in res["util"].items())
-
-    return res
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
-    if 'fio' == binary_tp:
-        return run_fio(*argv, **kwargs)
-    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run fio' and return result")
-    parser.add_argument(
-        "--type", metavar="BINARY_TYPE",
-        choices=['fio'], required=True)
-    parser.add_argument("--start-at", metavar="START_TIME", type=int)
-    parser.add_argument("--json", action="store_true", default=False)
-    parser.add_argument("jobfile")
-    return parser.parse_args(argv)
-
-
-def main(argv):
-    argv_obj = parse_args(argv)
-    if argv_obj.jobfile == '-':
-        job_cfg = ""
-        dtime = 10
-        while True:
-            r, w, x = select.select([sys.stdin], [], [], dtime)
-            if len(r) == 0:
-                raise IOError("No config provided")
-            char = sys.stdin.read(1)
-            if '' == char:
-                break
-            job_cfg += char
-            dtime = 1
-    else:
-        job_cfg = open(argv_obj.jobfile).read()
-
-    rcp = RawConfigParser()
-    rcp.readfp(StringIO(job_cfg))
-    assert len(rcp.sections()) == 1
-
-    if argv_obj.start_at is not None:
-        ctime = time.time()
-        if argv_obj.start_at >= ctime:
-            time.sleep(ctime - argv_obj.start_at)
-
-    res = run_benchmark(argv_obj.type, job_cfg)
-    res['__meta__'] = dict(rcp.items(rcp.sections()[0]))
-    res['__meta__']['raw'] = job_cfg
-
-    if argv_obj.json:
-        sys.stdout.write(json.dumps(res))
-    else:
-        sys.stdout.write(pprint.pformat(res))
-        sys.stdout.write("\n")
-    return 0
-
-if __name__ == '__main__':
-    exit(main(sys.argv[1:]))
diff --git a/tests/io_scenario_check_assumptions.cfg b/tests/io_scenario_check_assumptions.cfg
new file mode 100644
index 0000000..25d99bc
--- /dev/null
+++ b/tests/io_scenario_check_assumptions.cfg
@@ -0,0 +1,59 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time=0
+runtime={% 10, 15, 20, 30, 60, 120 %}
+
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw=randread
+sync=1
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/io_task.cfg b/tests/io_task.cfg
similarity index 100%
rename from io_task.cfg
rename to tests/io_task.cfg
diff --git a/tests/io_task_test.cfg b/tests/io_task_test.cfg
new file mode 100644
index 0000000..a319fa0
--- /dev/null
+++ b/tests/io_task_test.cfg
@@ -0,0 +1,24 @@
+# [__warmup]
+# blocksize=4k
+# filename=/tmp/xxx.bin
+# rw=randwrite
+# direct=1
+# buffered=0
+# iodepth=1
+# size=1Gb
+# runtime=5
+# time_based
+
+[writetest * 3]
+numjobs=4
+wait_for_previous
+ramp_time=5
+blocksize=4k
+filename=/tmp/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=100Mb
+runtime=10
+time_based
diff --git a/itest.py b/tests/itest.py
similarity index 69%
rename from itest.py
rename to tests/itest.py
index daebd1a..e7fd3eb 100644
--- a/itest.py
+++ b/tests/itest.py
@@ -1,3 +1,4 @@
+import re
 import abc
 import json
 import os.path
@@ -5,7 +6,7 @@
 from StringIO import StringIO
 from ConfigParser import RawConfigParser
 
-from tests import io
+from tests import disk_test_agent
 from ssh_utils import copy_paths
 from utils import run_over_ssh, ssize_to_b
 
@@ -90,26 +91,27 @@
 
 
 class IOPerfTest(IPerfTest):
-    io_py_remote = "/tmp/io.py"
+    io_py_remote = "/tmp/disk_test_agent.py"
 
     def __init__(self,
                  test_options,
                  on_result_cb):
         IPerfTest.__init__(self, on_result_cb)
         self.options = test_options
-        self.config_fname = test_options['config_file']
-        self.tool = test_options['tool']
-        self.configs = []
+        self.config_fname = test_options['cfg']
+        self.config_params = test_options.get('params', {})
+        self.tool = test_options.get('tool', 'fio')
+        self.raw_cfg = open(self.config_fname).read()
 
-        cp = RawConfigParser()
-        cp.readfp(open(self.config_fname))
-
-        for secname in cp.sections():
-            params = dict(cp.items(secname))
-            self.configs.append((secname, params))
+        parse_func = disk_test_agent.parse_fio_config_full
+        self.configs = parse_func(self.raw_cfg, self.config_params)
 
     def pre_run(self, conn):
-        local_fname = io.__file__.rsplit('.')[0] + ".py"
+
+        # TODO: install fio, if not installed
+        run_over_ssh(conn, "apt-get -y install fio")
+
+        local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
         self.files_to_copy = {local_fname: self.io_py_remote}
         copy_paths(conn, self.files_to_copy)
 
@@ -129,46 +131,22 @@
     def run(self, conn, barrier):
         cmd_templ = "env python2 {0} --type {1} --json -"
         cmd = cmd_templ.format(self.io_py_remote, self.tool)
+        logger.debug("Run {0}".format(cmd))
         try:
-            for secname, _params in self.configs:
-                params = _params.copy()
-                count = params.pop('count', 1)
-
-                config = RawConfigParser()
-                config.add_section(secname)
-
-                for k, v in params.items():
-                    config.set(secname, k, v)
-
-                cfg = StringIO()
-                config.write(cfg)
-
-                # FIX python config parser-fio incompatibility
-                # remove spaces around '='
-                new_cfg = []
-                config_data = cfg.getvalue()
-                for line in config_data.split("\n"):
-                    if '=' in line:
-                        name, val = line.split('=', 1)
-                        name = name.strip()
-                        val = val.strip()
-                        line = "{0}={1}".format(name, val)
-                    new_cfg.append(line)
-
-                for _ in range(count):
-                    barrier.wait()
-                    code, out_err = run_over_ssh(conn, cmd,
-                                                 stdin_data="\n".join(new_cfg))
-                    self.on_result(code, out_err, cmd)
+            barrier.wait()
+            code, out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
+            self.on_result(code, out_err, cmd)
         finally:
             barrier.exit()
 
     def on_result(self, code, out_err, cmd):
         if 0 == code:
             try:
-                for line in out_err.split("\n"):
-                    if line.strip() != "":
-                        self.on_result_cb(json.loads(line))
+                start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+                end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+                for block in re.split(start_patt, out_err)[1:]:
+                    data, garbage = re.split(end_patt, block)
+                    self.on_result_cb(json.loads(data.strip()))
             except Exception as exc:
                 msg_templ = "Error during postprocessing results: {0!r}"
                 raise RuntimeError(msg_templ.format(exc.message))