more improvements and fixes and new bugs
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 3cd5cff..73407c0 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -27,11 +27,13 @@
 """
 
 
-def discover(ctx, discover, clusters_info, var_dir):
+def discover(ctx, discover, clusters_info, var_dir, discover_nodes=True):
     nodes_to_run = []
     clean_data = None
     for cluster in discover:
-        if cluster == "openstack":
+        if cluster == "openstack" and not discover_nodes:
+            logger.warning("Skip openstack cluster discovery")
+        elif cluster == "openstack" and discover_nodes:
             cluster_info = clusters_info["openstack"]
             conn = cluster_info['connection']
             user, passwd, tenant = parse_creds(conn['creds'])
@@ -56,7 +58,9 @@
             nodes_to_run.extend(os_nodes)
 
         elif cluster == "fuel":
-            res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
+            res = fuel.discover_fuel_nodes(clusters_info['fuel'],
+                                           var_dir,
+                                           discover_nodes)
             nodes, clean_data, openrc_dict = res
 
             ctx.fuel_openstack_creds = {'name': openrc_dict['username'],
@@ -71,15 +75,20 @@
 
             fuel_openrc_fname = os.path.join(var_dir,
                                              env_f_name + "_openrc")
+
             with open(fuel_openrc_fname, "w") as fd:
                 fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
+
             msg = "Openrc for cluster {0} saves into {1}"
             logger.debug(msg.format(env_name, fuel_openrc_fname))
             nodes_to_run.extend(nodes)
 
         elif cluster == "ceph":
-            cluster_info = clusters_info["ceph"]
-            nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+            if discover_nodes:
+                cluster_info = clusters_info["ceph"]
+                nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+            else:
+                logger.warning("Skip ceph cluster discovery")
         else:
             msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
             raise ValueError(msg_templ.format(cluster))
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 149ec31..a787786 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -18,7 +18,7 @@
 BASE_PF_PORT = 44006
 
 
-def discover_fuel_nodes(fuel_data, var_dir):
+def discover_fuel_nodes(fuel_data, var_dir, discover_nodes=True):
     username, tenant_name, password = parse_creds(fuel_data['creds'])
     creds = {"username": username,
              "tenant_name": tenant_name,
@@ -28,6 +28,11 @@
 
     cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
     cluster = reflect_cluster(conn, cluster_id)
+
+    if not discover_nodes:
+        logger.warning("Skip fuel cluster discovery")
+        return ([], None, cluster.get_openrc())
+
     version = FuelInfo(conn).get_version()
 
     fuel_nodes = list(cluster.get_nodes())
@@ -114,6 +119,9 @@
 
 
 def clean_fuel_port_forwarding(clean_data):
+    if clean_data is None:
+        return
+
     conn, iface, ips_ports = clean_data
     for ip, port in ips_ports:
         forward_ssh_port(conn, iface, port, ip, clean=True)
diff --git a/wally/run_test.py b/wally/run_test.py
index 42e96ff..b22ce38 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,6 +2,7 @@
 
 import os
 import sys
+import time
 import Queue
 import pprint
 import logging
@@ -15,7 +16,7 @@
 from concurrent.futures import ThreadPoolExecutor
 
 from wally import pretty_yaml
-from wally.tests_sensors import deploy_sensors_stage
+from wally.sensors_utils import deploy_sensors_stage
 from wally.discover import discover, Node, undiscover
 from wally import utils, report, ssh_utils, start_vms
 from wally.suits.itest import IOPerfTest, PgBenchTest
@@ -60,8 +61,11 @@
                                                 log_warns=log_warns)
         else:
             raise ValueError("Unknown url type {0}".format(node.conn_url))
-    except Exception:
-        logger.exception("During connect to " + node.get_conn_id())
+    except Exception as exc:
+        # logger.exception("During connect to " + node.get_conn_id())
+        msg = "During connect to {0}: {1}".format(node.get_conn_id(),
+                                                  exc.message)
+        logger.error(msg)
         node.connection = None
 
 
@@ -139,14 +143,23 @@
 
         results = []
 
+        # MAX_WAIT_TIME = 10
+        # end_time = time.time() + MAX_WAIT_TIME
+
+        # while time.time() < end_time:
         while True:
             for th in threads:
                 th.join(1)
                 gather_results(res_q, results)
+                # if time.time() > end_time:
+                #     break
 
             if all(not th.is_alive() for th in threads):
                 break
 
+        # if any(th.is_alive() for th in threads):
+        #     logger.warning("Some test threads still running")
+
         gather_results(res_q, results)
         yield name, test.merge_results(results)
 
@@ -190,8 +203,11 @@
     if cfg.get('discover') is not None:
         discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
 
-        nodes, clean_data = discover(ctx, discover_objs,
-                                     cfg['clouds'], cfg['var_dir'])
+        nodes, clean_data = discover(ctx,
+                                     discover_objs,
+                                     cfg['clouds'],
+                                     cfg['var_dir'],
+                                     not cfg['dont_discover_nodes'])
 
         def undiscover_stage(cfg, ctx):
             undiscover(clean_data)
@@ -203,7 +219,7 @@
         ctx.nodes.append(Node(url, roles.split(",")))
 
 
-def get_os_credentials(cfg, ctx, creds_type):
+def get_OS_credentials(cfg, ctx, creds_type):
     creds = None
 
     if creds_type == 'clouds':
@@ -243,7 +259,7 @@
     os_nodes_ids = []
 
     os_creds_type = config['creds']
-    os_creds = get_os_credentials(cfg, ctx, os_creds_type)
+    os_creds = get_OS_credentials(cfg, ctx, os_creds_type)
 
     start_vms.nova_connect(**os_creds)
 
@@ -294,10 +310,12 @@
                                      nodes=new_nodes,
                                      undeploy=False)
 
-                for test_group in config.get('tests', []):
-                    ctx.results.extend(run_tests(test_group, ctx.nodes))
+                if not cfg['no_tests']:
+                    for test_group in config.get('tests', []):
+                        ctx.results.extend(run_tests(test_group, ctx.nodes))
         else:
-            ctx.results.extend(run_tests(group, ctx.nodes))
+            if not cfg['no_tests']:
+                ctx.results.extend(run_tests(group, ctx.nodes))
 
 
 def shut_down_vms_stage(cfg, ctx):
@@ -353,10 +371,12 @@
 def console_report_stage(cfg, ctx):
     for tp, data in ctx.results:
         if 'io' == tp and data is not None:
+            print("\n")
             print(IOPerfTest.format_for_console(data))
+            print("\n")
 
 
-def report_stage(cfg, ctx):
+def html_report_stage(cfg, ctx):
     html_rep_fname = cfg['html_report_file']
 
     try:
@@ -409,10 +429,15 @@
     parser.add_argument("-i", '--build_id', type=str, default="id")
     parser.add_argument("-t", '--build_type', type=str, default="GA")
     parser.add_argument("-u", '--username', type=str, default="admin")
+    parser.add_argument("-n", '--no-tests', action='store_true',
+                        help="Don't run tests", default=False)
     parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
                         help="Only process data from previour run")
     parser.add_argument("-k", '--keep-vm', action='store_true',
                         help="Don't remove test vm's", default=False)
+    parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
+                        help="Don't connect/discover fuel nodes",
+                        default=False)
     parser.add_argument("config_file")
 
     return parser.parse_args(argv[1:])
@@ -423,9 +448,7 @@
 
     if opts.post_process_only is not None:
         stages = [
-            load_data_from(opts.post_process_only),
-            console_report_stage,
-            report_stage
+            load_data_from(opts.post_process_only)
         ]
     else:
         stages = [
@@ -434,11 +457,14 @@
             connect_stage,
             deploy_sensors_stage,
             run_tests_stage,
-            store_raw_results_stage,
-            console_report_stage,
-            report_stage
+            store_raw_results_stage
         ]
 
+    report_stages = [
+        console_report_stage,
+        html_report_stage
+    ]
+
     load_config(opts.config_file, opts.post_process_only)
 
     if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
@@ -456,14 +482,20 @@
     ctx.build_meta['build_descrption'] = opts.build_description
     ctx.build_meta['build_type'] = opts.build_type
     ctx.build_meta['username'] = opts.username
+
     cfg_dict['keep_vm'] = opts.keep_vm
+    cfg_dict['no_tests'] = opts.no_tests
+    cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
 
     try:
         for stage in stages:
             logger.info("Start {0.__name__} stage".format(stage))
             stage(cfg_dict, ctx)
     except Exception as exc:
-        msg = "Exception during current stage: {0}".format(exc.message)
+        emsg = exc.message
+        if emsg == "":
+            emsg = str(exc)
+        msg = "Exception during {0.__name__}: {1}".format(stage, emsg)
         logger.error(msg)
     finally:
         exc, cls, tb = sys.exc_info()
@@ -477,5 +509,8 @@
         if exc is not None:
             raise exc, cls, tb
 
-    logger.info("All info stored into {0}".format(cfg_dict['var_dir']))
+    for report_stage in report_stages:
+        report_stage(cfg_dict, ctx)
+
+    logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
     return 0
diff --git a/wally/sensors/storage/grafana.py b/wally/sensors/grafana.py
similarity index 100%
rename from wally/sensors/storage/grafana.py
rename to wally/sensors/grafana.py
diff --git a/wally/sensors/storage/influx_exporter.py b/wally/sensors/influx_exporter.py
similarity index 100%
rename from wally/sensors/storage/influx_exporter.py
rename to wally/sensors/influx_exporter.py
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index e86bbed..2b6ab8b 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -50,12 +50,12 @@
     sender = create_protocol(opts.url)
     prev = {}
 
-    while True:
-        try:
-            source_id = str(required_sensors.pop('source_id'))
-        except KeyError:
-            source_id = None
+    try:
+        source_id = str(required_sensors.pop('source_id'))
+    except KeyError:
+        source_id = None
 
+    while True:
         gtime, data = get_values(required_sensors.items())
         curr = {'time': SensorInfo(gtime, True)}
         for name, val in data.items():
diff --git a/wally/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
deleted file mode 100644
index d7bf6aa..0000000
--- a/wally/sensors/storage/__init__.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import struct
-
-
-def pack(val, tp=True):
-    if isinstance(val, int):
-        assert 0 <= val < 2 ** 16
-
-        if tp:
-            res = 'i'
-        else:
-            res = ""
-
-        res += struct.pack("!U", val)
-    elif isinstance(val, dict):
-        assert len(val) < 2 ** 16
-        if tp:
-            res = "d"
-        else:
-            res = ""
-
-        res += struct.pack("!U", len(val))
-        for k, v in dict.items():
-            assert 0 <= k < 2 ** 16
-            assert 0 <= v < 2 ** 32
-            res += struct.pack("!UI", k, v)
-    elif isinstance(val, str):
-        assert len(val) < 256
-        if tp:
-            res = "s"
-        else:
-            res = ""
-        res += chr(len(val)) + val
-    else:
-        raise ValueError()
-
-    return res
-
-
-def unpack(fd, tp=None):
-    if tp is None:
-        tp = fd.read(1)
-
-    if tp == 'i':
-        return struct.unpack("!U", fd.read(2))
-    elif tp == 'd':
-        res = {}
-        val_len = struct.unpack("!U", fd.read(2))
-        for _ in range(val_len):
-            k, v = struct.unpack("!UI", fd.read(6))
-            res[k] = v
-        return res
-    elif tp == 's':
-        val_len = struct.unpack("!U", fd.read(2))
-        return fd.read(val_len)
-
-    raise ValueError()
-
-
-class LocalStorage(object):
-    NEW_DATA = 0
-    NEW_SENSOR = 1
-    NEW_SOURCE = 2
-
-    def __init__(self, fd):
-        self.fd = fd
-        self.sensor_ids = {}
-        self.sources_ids = {}
-        self.max_source_id = 0
-        self.max_sensor_id = 0
-
-    def add_data(self, source, sensor_values):
-        source_id = self.sources_ids.get(source)
-        if source_id is None:
-            source_id = self.max_source_id
-            self.sources_ids[source] = source_id
-            self.emit(self.NEW_SOURCE, source_id, source)
-            self.max_source_id += 1
-
-        new_sensor_values = {}
-
-        for name, val in sensor_values.items():
-            sensor_id = self.sensor_ids.get(name)
-            if sensor_id is None:
-                sensor_id = self.max_sensor_id
-                self.sensor_ids[name] = sensor_id
-                self.emit(self.NEW_SENSOR, sensor_id, name)
-                self.max_sensor_id += 1
-            new_sensor_values[sensor_id] = val
-
-        self.emit(self.NEW_DATA, source_id, new_sensor_values)
-
-    def emit(self, tp, v1, v2):
-        self.fd.write(chr(tp) + pack(v1, False) + pack(v2))
-
-    def readall(self):
-        tp = self.fd.read(1)
-        if ord(tp) == self.NEW_DATA:
-            pass
-        elif ord(tp) == self.NEW_SENSOR:
-            pass
-        elif ord(tp) == self.NEW_SOURCE:
-            pass
-        else:
-            raise ValueError()
diff --git a/wally/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
deleted file mode 100644
index 7c57924..0000000
--- a/wally/sensors/storage/grafana_template.js
+++ /dev/null
@@ -1,46 +0,0 @@
-/* global _ */
-
-/*
- * Complex scripted dashboard
- * This script generates a dashboard object that Grafana can load. It also takes a number of user
- * supplied URL parameters (int ARGS variable)
- *
- * Return a dashboard object, or a function
- *
- * For async scripts, return a function, this function must take a single callback function as argument,
- * call this callback function with the dashboard object (look at scripted_async.js for an example)
- */
-
-
-
-// accessable variables in this scope
-var window, document, ARGS, $, jQuery, moment, kbn;
-
-// Setup some variables
-var dashboard;
-
-// All url parameters are available via the ARGS object
-var ARGS;
-
-// Intialize a skeleton with nothing but a rows array and service object
-dashboard = {rows : []};
-
-// Set a title
-dashboard.title = 'Tests dash';
-
-// Set default time
-// time can be overriden in the url using from/to parameteres, but this is
-// handled automatically in grafana core during dashboard initialization
-dashboard.time = {
-    from: "now-5m",
-    to: "now"
-};
-
-dashboard.rows.push({
-    title: 'Chart',
-    height: '300px',
-    panels: %s
-});
-
-
-return dashboard;
diff --git a/wally/sensors/storage/koder.js b/wally/sensors/storage/koder.js
deleted file mode 100644
index a65a454..0000000
--- a/wally/sensors/storage/koder.js
+++ /dev/null
@@ -1,47 +0,0 @@
-/* global _ */
-
-/*
- * Complex scripted dashboard
- * This script generates a dashboard object that Grafana can load. It also takes a number of user
- * supplied URL parameters (int ARGS variable)
- *
- * Return a dashboard object, or a function
- *
- * For async scripts, return a function, this function must take a single callback function as argument,
- * call this callback function with the dashboard object (look at scripted_async.js for an example)
- */
-
-
-
-// accessable variables in this scope
-var window, document, ARGS, $, jQuery, moment, kbn;
-
-// Setup some variables
-var dashboard;
-
-// All url parameters are available via the ARGS object
-var ARGS;
-
-// Intialize a skeleton with nothing but a rows array and service object
-dashboard = {rows : []};
-
-// Set a title
-dashboard.title = 'Tests dash';
-
-// Set default time
-// time can be overriden in the url using from/to parameteres, but this is
-// handled automatically in grafana core during dashboard initialization
-dashboard.time = {
-    from: "now-5m",
-    to: "now"
-};
-
-dashboard.rows.push({
-    title: 'Chart',
-    height: '300px',
-    panels: [{"span": 12, "title": "writes_completed", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"writes_completed\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}, {"span": 12, "title": "sectors_written", "linewidth": 2, "type": "graph", "targets": [{"alias": "192.168.0.104 io sda1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='sda1' order asc"}, {"alias": "192.168.0.104 io rbd1", "interval": "", "target": "disk io", "rawQuery": true, "query": "select value from \"sectors_written\" where $timeFilter and host='192.168.0.104' and device='rbd1' order asc"}], "tooltip": {"shared": true}, "fill": 1}]
-});
-
-
-return dashboard;
-
diff --git a/wally/tests_sensors.py b/wally/sensors_utils.py
similarity index 98%
rename from wally/tests_sensors.py
rename to wally/sensors_utils.py
index 1e0b1ad..3cd6bc8 100644
--- a/wally/tests_sensors.py
+++ b/wally/sensors_utils.py
@@ -75,7 +75,6 @@
 
     mon_q = Queue.Queue()
     fd = open(cfg_dict['sensor_storage'], "w")
-    logger.info("Start sensors data receiving thread")
     sensor_listen_th = threading.Thread(None, save_sensors_data, None,
                                         (sensors_data_q, mon_q, fd))
     sensor_listen_th.daemon = True
@@ -99,7 +98,6 @@
     if nodes is None:
         nodes = ctx.nodes
 
-    logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
     monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
                                                                     nodes)
 
@@ -108,6 +106,7 @@
         return
 
     if ctx.sensors_mon_q is None:
+        logger.info("Start sensors data receiving thread")
         ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
                                                         sensors_configs)
 
@@ -116,6 +115,7 @@
             stop_and_remove_sensors(sensors_configs)
         ctx.clear_calls_stack.append(remove_sensors_stage)
 
+    logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
     deploy_and_start_sensors(sensors_configs)
     wait_for_new_sensors_data(ctx, monitored_nodes)
 
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 0de7816..f1818ad 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -59,15 +59,15 @@
         except socket.error:
             retry_left = retry_count - i - 1
 
-            if log_warns:
-                msg = "Node {0.host}:{0.port} connection timeout."
+            if retry_left > 0:
+                if log_warns:
+                    msg = "Node {0.host}:{0.port} connection timeout."
 
-                if 0 != retry_left:
-                    msg += " {0} retry left.".format(retry_left)
+                    if 0 != retry_left:
+                        msg += " {0} retry left.".format(retry_left)
 
-                logger.warning(msg.format(creds))
-
-            if 0 == retry_left:
+                    logger.warning(msg.format(creds))
+            else:
                 raise
 
             time.sleep(1)
diff --git a/wally/start_vms.py b/wally/start_vms.py
index b3c141a..de1f312 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -18,28 +18,50 @@
 logger = logging.getLogger("wally.vms")
 
 
-def ostack_get_creds():
-    env = os.environ.get
-    name = env('OS_USERNAME')
-    passwd = env('OS_PASSWORD')
-    tenant = env('OS_TENANT_NAME')
-    auth_url = env('OS_AUTH_URL')
-
-    return name, passwd, tenant, auth_url
-
-
+STORED_OPENSTACK_CREDS = None
 NOVA_CONNECTION = None
+CINDER_CONNECTION = None
+
+
+def ostack_get_creds():
+    if STORED_OPENSTACK_CREDS is None:
+        env = os.environ.get
+        name = env('OS_USERNAME')
+        passwd = env('OS_PASSWORD')
+        tenant = env('OS_TENANT_NAME')
+        auth_url = env('OS_AUTH_URL')
+        return name, passwd, tenant, auth_url
+    else:
+        return STORED_OPENSTACK_CREDS
 
 
 def nova_connect(name=None, passwd=None, tenant=None, auth_url=None):
     global NOVA_CONNECTION
+    global STORED_OPENSTACK_CREDS
+
     if NOVA_CONNECTION is None:
         if name is None:
             name, passwd, tenant, auth_url = ostack_get_creds()
+        else:
+            STORED_OPENSTACK_CREDS = (name, passwd, tenant, auth_url)
+
         NOVA_CONNECTION = n_client('1.1', name, passwd, tenant, auth_url)
     return NOVA_CONNECTION
 
 
+def cinder_connect(name=None, passwd=None, tenant=None, auth_url=None):
+    global CINDER_CONNECTION
+    global STORED_OPENSTACK_CREDS
+
+    if CINDER_CONNECTION is None:
+        if name is None:
+            name, passwd, tenant, auth_url = ostack_get_creds()
+        else:
+            STORED_OPENSTACK_CREDS = (name, passwd, tenant, auth_url)
+        CINDER_CONNECTION = c_client(name, passwd, tenant, auth_url)
+    return CINDER_CONNECTION
+
+
 def nova_disconnect():
     global NOVA_CONNECTION
     if NOVA_CONNECTION is not None:
@@ -144,12 +166,15 @@
 
 
 def create_volume(size, name):
-    cinder = c_client(*ostack_get_creds())
+    cinder = cinder_connect()
+    # vol_id = "2974f227-8755-4333-bcae-cd9693cd5d04"
+    # logger.warning("Reusing volume {0}".format(vol_id))
+    # vol = cinder.volumes.get(vol_id)
     vol = cinder.volumes.create(size=size, display_name=name)
     err_count = 0
 
-    while vol['status'] != 'available':
-        if vol['status'] == 'error':
+    while vol.status != 'available':
+        if vol.status == 'error':
             if err_count == 3:
                 logger.critical("Fail to create volume")
                 raise RuntimeError("Fail to create volume")
@@ -160,7 +185,7 @@
                 vol = cinder.volumes.create(size=size, display_name=name)
                 continue
         time.sleep(1)
-        vol = cinder.volumes.get(vol['id'])
+        vol = cinder.volumes.get(vol.id)
     return vol
 
 
@@ -300,7 +325,6 @@
             nova.servers.delete(srv)
 
             for j in range(120):
-                # print "wait till server deleted"
                 all_id = set(alive_srv.id for alive_srv in nova.servers.list())
                 if srv.id not in all_id:
                     break
@@ -313,16 +337,13 @@
         raise RuntimeError("Failed to start server".format(srv.id))
 
     if vol_sz is not None:
-        # print "creating volume"
         vol = create_volume(vol_sz, name)
-        # print "attach volume to server"
-        nova.volumes.create_server_volume(srv.id, vol['id'], None)
+        nova.volumes.create_server_volume(srv.id, vol.id, None)
 
     if flt_ip is Allocate:
         flt_ip = nova.floating_ips.create(pool)
 
     if flt_ip is not None:
-        # print "attaching ip to server"
         srv.add_floating_ip(flt_ip)
 
     return flt_ip.ip, nova.servers.get(srv.id)
@@ -332,7 +353,7 @@
     clear_all(NOVA_CONNECTION, nodes_ids, None)
 
 
-def clear_all(nova, ids=None, name_templ="ceph-test-{0}"):
+def clear_all(nova, ids=None, name_templ=None):
 
     def need_delete(srv):
         if name_templ is not None:
@@ -340,6 +361,14 @@
         else:
             return srv.id in ids
 
+    volumes_to_delete = []
+    cinder = cinder_connect()
+    for vol in cinder.volumes.list():
+        for attachment in vol.attachments:
+            if attachment['server_id'] in ids:
+                volumes_to_delete.append(vol)
+                break
+
     deleted_srvs = set()
     for srv in nova.servers.list():
         if need_delete(srv):
@@ -360,13 +389,9 @@
 
     # wait till vm actually deleted
 
-    if name_templ is not None:
-        cinder = c_client(*ostack_get_creds())
-        for vol in cinder.volumes.list():
-            if isinstance(vol.display_name, basestring):
-                if re.match(name_templ.format("\\d+"), vol.display_name):
-                    if vol.status in ('available', 'error'):
-                        logger.debug("Deleting volume " + vol.display_name)
-                        cinder.volumes.delete(vol)
+    logger.warning("Volume deletion commented out")
+    # for vol in volumes_to_delete:
+    #     logger.debug("Deleting volume " + vol.display_name)
+    #     cinder.volumes.delete(vol)
 
     logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/wally/statistic.py b/wally/statistic.py
index 0729283..f3fcd6a 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -15,6 +15,10 @@
     return med, dev
 
 
+def round_3_digit(val):
+    return round_deviation((val, val / 10.0))[0]
+
+
 def round_deviation(med_dev):
     med, dev = med_dev
 
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
 counter = [0]
 
 
+def extract_iterables(vals):
+    iterable_names = []
+    iterable_values = []
+    rest = {}
+
+    for val_name, val in vals.items():
+        if val is None or not val.startswith('{%'):
+            rest[val_name] = val
+        else:
+            assert val.endswith("%}")
+            content = val[2:-2]
+            iterable_names.append(val_name)
+            iterable_values.append(list(i.strip() for i in content.split(',')))
+
+    return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+    processed_vals = {}
+
+    for val_name, val in sec.items():
+        if val is None:
+            processed_vals[val_name] = val
+        else:
+            try:
+                processed_vals[val_name] = val.format(**params)
+            except KeyError:
+                if final:
+                    raise
+                processed_vals[val_name] = val
+
+    return processed_vals
+
+
 def process_section(name, vals, defaults, format_params):
     vals = vals.copy()
     params = format_params.copy()
@@ -67,30 +101,25 @@
         repeat = 1
 
     # this code can be optimized
-    iterable_names = []
-    iterable_values = []
-    processed_vals = {}
-
-    for val_name, val in vals.items():
-        if val is None:
-            processed_vals[val_name] = val
-        # remove hardcode
-        elif val.startswith('{%'):
-            assert val.endswith("%}")
-            content = val[2:-2].format(**params)
-            iterable_names.append(val_name)
-            iterable_values.append(list(i.strip() for i in content.split(',')))
-        else:
-            processed_vals[val_name] = val.format(**params)
+    iterable_names, iterable_values, processed_vals = extract_iterables(vals)
 
     group_report_err_msg = "Group reporting should be set if numjobs != 1"
 
     if iterable_values == []:
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=False)
         params['UNIQ'] = 'UN{0}'.format(counter[0])
         counter[0] += 1
         params['TEST_SUMM'] = get_test_summary(processed_vals)
 
-        if processed_vals.get('numjobs', '1') != '1':
+        num_jobs = int(processed_vals.get('numjobs', '1'))
+        fsize = to_bytes(processed_vals['size'])
+        params['PER_TH_OFFSET'] = fsize // num_jobs
+
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=True)
+
+        if num_jobs != 1:
             assert 'group_reporting' in processed_vals, group_report_err_msg
 
         ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
             processed_vals['ramp_time'] = ramp_time
     else:
         for it_vals in itertools.product(*iterable_values):
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=False)
+
             processed_vals.update(dict(zip(iterable_names, it_vals)))
             params['UNIQ'] = 'UN{0}'.format(counter[0])
             counter[0] += 1
             params['TEST_SUMM'] = get_test_summary(processed_vals)
 
+            num_jobs = int(processed_vals.get('numjobs', '1'))
+            fsize = to_bytes(processed_vals['size'])
+            params['PER_TH_OFFSET'] = fsize // num_jobs
+
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=True)
+
             if processed_vals.get('numjobs', '1') != '1':
                 assert 'group_reporting' in processed_vals,\
                     group_report_err_msg
@@ -130,6 +169,7 @@
     time = 0
     for _, params in combinations:
         time += int(params.get('ramp_time', 0))
+        time += int(params.get('_ramp_time', 0))
         time += int(params.get('runtime', 0))
     return time
 
@@ -236,6 +276,8 @@
             return (1024 ** 2) * int(sz[:-1])
         if sz[-1] == 'k':
             return 1024 * int(sz[:-1])
+        if sz[-1] == 'g':
+            return (1024 ** 3) * int(sz[:-1])
         raise
 
 
@@ -348,12 +390,17 @@
 def do_run_fio(bconf):
     benchmark_config = format_fio_config(bconf)
     cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
-    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+    p = subprocess.Popen(cmd,
+                         stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
+                         stderr=subprocess.PIPE)
 
     # set timeout
-    raw_out, _ = p.communicate(benchmark_config)
+    raw_out, raw_err = p.communicate(benchmark_config)
+
+    if 0 != p.returncode:
+        msg = "Fio failed with code: {0}\nOutput={1}"
+        raise OSError(msg.format(p.returncode, raw_err))
 
     try:
         parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
 MAX_JOBS = 1000
 
 
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+    if cluster:
+        for name, sec in whole_conf:
+            if '_ramp_time' in sec:
+                sec['ramp_time'] = sec.pop('_ramp_time')
+            yield [(name, sec)]
+        return
+
     jcount = 0
     runtime = 0
     bconf = []
@@ -458,12 +512,14 @@
     res[jname] = j_res
 
 
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip:]
     res = ""
 
-    for bconf in next_test_portion(whole_conf, runcycle):
+    for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
         res += format_fio_config(bconf)
+        res += "\n#" + "-" * 50 + "\n\n"
 
     return res
 
@@ -473,7 +529,8 @@
             runcycle=None,
             raw_results_func=None,
             skip_tests=0,
-            fake_fio=False):
+            fake_fio=False,
+            cluster=False):
 
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
     whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
     executed_tests = 0
     ok = True
     try:
-        for bconf in next_test_portion(whole_conf, runcycle):
+        for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
 
             if fake_fio:
                 res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
                     continue
 
                 add_job_results(jname, job_output, jconfig, res)
-
+            curr_test_num += 1
             msg_template = "Done {0} tests from {1}. ETA: {2}"
             exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
 
@@ -588,6 +645,8 @@
                         help="Skip NUM tests")
     parser.add_argument("--faked-fio", action='store_true',
                         default=False, help="Emulate fio with 0 test time")
+    parser.add_argument("--cluster", action='store_true',
+                        default=False, help="Apply cluster-test settings")
     parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
                         default=[],
                         help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
         return 0
 
     if argv_obj.num_tests or argv_obj.compile:
-        bconf = list(parse_fio_config_full(job_cfg, params))
-        bconf = bconf[argv_obj.skip_tests:]
-
         if argv_obj.compile:
-            out_fd.write(format_fio_config(bconf))
+            data = compile(job_cfg, params, argv_obj.skip_tests,
+                           cluster=argv_obj.cluster)
+            out_fd.write(data)
             out_fd.write("\n")
 
         if argv_obj.num_tests:
+            bconf = list(parse_fio_config_full(job_cfg, params,
+                                               argv_obj.cluster))
+            bconf = bconf[argv_obj.skip_tests:]
             print len(bconf)
 
         return 0
@@ -653,7 +714,8 @@
                                            argv_obj.runcycle,
                                            rrfunc,
                                            argv_obj.skip_tests,
-                                           argv_obj.faked_fio)
+                                           argv_obj.faked_fio,
+                                           cluster=argv_obj.cluster)
     etime = time.time()
 
     res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 529b78a..a1da1c3 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -1,8 +1,8 @@
 import texttable
 
 from wally.utils import ssize_to_b
-from wally.statistic import med_dev
 from wally.suits.io.agent import get_test_summary
+from wally.statistic import med_dev, round_deviation, round_3_digit
 
 
 def key_func(k_data):
@@ -37,14 +37,24 @@
 
         descr = get_test_summary(data)
 
-        iops, _ = med_dev(data['iops'])
-        bw, bwdev = med_dev(data['bw'])
+        iops, _ = round_deviation(med_dev(data['iops']))
+        bw, bwdev = round_deviation(med_dev(data['bw']))
 
         # 3 * sigma
-        dev_perc = int((bwdev * 300) / bw)
+        if 0 == bw:
+            assert 0 == bwdev
+            dev_perc = 0
+        else:
+            dev_perc = int((bwdev * 300) / bw)
 
-        params = (descr, int(iops), int(bw), dev_perc,
-                  int(med_dev(data['lat'])[0]) // 1000)
+        med_lat, _ = round_deviation(med_dev(data['lat']))
+        med_lat = int(med_lat) // 1000
+
+        iops = round_3_digit(iops)
+        bw = round_3_digit(bw)
+        med_lat = round_3_digit(med_lat)
+
+        params = (descr, int(iops), int(bw), dev_perc, med_lat)
         tab.add_row(params)
 
     header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
new file mode 100644
index 0000000..5e793f2
--- /dev/null
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -0,0 +1,62 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+size=5G
+ramp_time=20
+runtime=20
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw=read
+direct=1
+offset_increment={PER_TH_OFFSET}
+numjobs={% 20, 120 %}
+
+# # ---------------------------------------------------------------------
+# # check different thread count, sync mode. (latency, iops) = func(th_count)
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read mode. (latency, iops) = func(th_count)
+# # also check iops for randread
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randread
+# direct=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# # also check BW for seq read/write.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=1m
+# rw={% read, write %}
+# direct=1
+# offset_increment=1073741824 # 1G
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check IOPS randwrite.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 5e24009..46191f2 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -8,8 +8,8 @@
 filename={FILENAME}
 NUM_ROUNDS=7
 
-ramp_time=5
 size=10Gb
+ramp_time=5
 runtime=30
 
 # ---------------------------------------------------------------------
@@ -39,6 +39,7 @@
 blocksize=1m
 rw={% read, write %}
 direct=1
+offset_increment=1073741824 # 1G
 numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
 
 # ---------------------------------------------------------------------
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1e93247..c5615bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -137,29 +137,45 @@
         self.files_to_copy = {local_fname: self.io_py_remote}
         copy_paths(conn, self.files_to_copy)
 
-        files = {}
+        if self.options.get('prefill_files', True):
+            files = {}
 
-        for secname, params in self.configs:
-            sz = ssize_to_b(params['size'])
-            msz = msz = sz / (1024 ** 2)
-            if sz % (1024 ** 2) != 0:
-                msz += 1
+            for secname, params in self.configs:
+                sz = ssize_to_b(params['size'])
+                msz = sz / (1024 ** 2)
+                if sz % (1024 ** 2) != 0:
+                    msz += 1
 
-            fname = params['filename']
-            files[fname] = max(files.get(fname, 0), msz)
+                fname = params['filename']
 
-        # logger.warning("dd run DISABLED")
-        # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+                # if already has other test with the same file name
+                # take largest size
+                files[fname] = max(files.get(fname, 0), msz)
 
-        cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
-        for fname, sz in files.items():
-            cmd = cmd_templ.format(fname, 1024 ** 2, msz)
-            run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+            # logger.warning("dd run DISABLED")
+            # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+            cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+            ssize = 0
+            stime = time.time()
+
+            for fname, curr_sz in files.items():
+                cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+                ssize += curr_sz
+                run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+
+            ddtime = time.time() - stime
+            if ddtime > 1E-3:
+                fill_bw = int(ssize / ddtime)
+                mess = "Initiall dd fill bw is {0} MiBps for this vm"
+                logger.info(mess.format(fill_bw))
+        else:
+            logger.warning("Test files prefill disabled")
 
     def run(self, conn, barrier):
         # logger.warning("No tests runned")
         # return
-        cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+        cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
         # cmd_templ = "env python2 {0} --type {1} {2} --json -"
 
         params = " ".join("{0}={1}".format(k, v)
@@ -168,16 +184,24 @@
         if "" != params:
             params = "--params " + params
 
-        cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+        if self.options.get('cluster', False):
+            logger.info("Cluster mode is used")
+            cluster_opt = "--cluster"
+        else:
+            logger.info("Non-cluster mode is used")
+            cluster_opt = ""
+
+        cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
+                               cluster_opt)
         logger.debug("Waiting on barrier")
 
         exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
         exec_time_str = sec_to_str(exec_time)
 
         try:
+            timeout = int(exec_time * 1.2 + 300)
             if barrier.wait():
                 templ = "Test should takes about {0}. Will wait at most {1}"
-                timeout = int(exec_time * 1.1 + 300)
                 logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
 
             out_err = run_over_ssh(conn, cmd,