a lot of changes
diff --git a/wally/run_test.py b/wally/run_test.py
index dc6637f..5322432 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,6 +5,7 @@
 import time
 import Queue
 import pprint
+import signal
 import logging
 import argparse
 import functools
@@ -14,6 +15,7 @@
 import collections
 
 import yaml
+import faulthandler
 from concurrent.futures import ThreadPoolExecutor
 
 from wally import pretty_yaml
@@ -21,9 +23,9 @@
 from wally.discover import discover, Node
 from wally.timeseries import SensorDatastore
 from wally import utils, report, ssh_utils, start_vms
-from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
-from wally.sensors_utils import deploy_sensors_stage
 from wally.config import cfg_dict, load_config, setup_loggers
+from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
+from wally.sensors_utils import deploy_sensors_stage, gather_sensors_stage
 
 
 try:
@@ -143,6 +145,78 @@
         res_q.put(exc)
 
 
+def run_single_test(test_nodes, name, test_cls, params,
+                    test_local_folder, run_uuid, counter=[0]):
+    logger.info("Starting {0} tests".format(name))
+    res_q = Queue.Queue()
+    threads = []
+    coord_q = Queue.Queue()
+    rem_folder = test_local_folder.format(name=name)
+
+    barrier = utils.Barrier(len(test_nodes))
+    for idx, node in enumerate(test_nodes):
+        msg = "Starting {0} test on {1} node"
+        logger.debug(msg.format(name, node.conn_url))
+
+        dname = "{0}_{1}_{2}".format(name, counter[0], node.get_ip())
+        counter[0] += 1
+        dr = os.path.join(cfg_dict['test_log_directory'], dname)
+
+        if not os.path.exists(dr):
+            os.makedirs(dr)
+
+        params = params.copy()
+        params['testnodes_count'] = len(test_nodes)
+        test = test_cls(options=params,
+                        is_primary=(idx == 0),
+                        on_result_cb=res_q.put,
+                        test_uuid=run_uuid,
+                        node=node,
+                        remote_dir=rem_folder,
+                        log_directory=dr,
+                        coordination_queue=coord_q)
+        th = threading.Thread(None, test_thread, None,
+                              (test, node, barrier, res_q))
+        threads.append(th)
+        th.daemon = True
+        th.start()
+
+    th = threading.Thread(None, test_cls.coordination_th, None,
+                          (coord_q, barrier, len(threads)))
+    threads.append(th)
+    th.daemon = True
+    th.start()
+
+    results = []
+    coord_q.put(None)
+
+    while len(threads) != 0:
+        nthreads = []
+
+        for th in threads:
+            if not th.is_alive():
+                th.join()
+            else:
+                nthreads.append(th)
+
+        threads = nthreads
+
+        while not res_q.empty():
+            val = res_q.get()
+
+            if isinstance(val, utils.StopTestError):
+                raise val
+
+            if isinstance(val, Exception):
+                msg = "Exception during test execution: {0!s}"
+                raise ValueError(msg.format(val))
+
+            results.append(val)
+
+    results = test_cls.merge_results(results)
+    return results
+
+
 def run_tests(cfg, test_block, nodes):
     tool_type_mapper = {
         "io": IOPerfTest,
@@ -156,87 +230,65 @@
         logger.error("No test nodes found")
         return
 
-    test_number_per_type = {}
-    res_q = Queue.Queue()
-
     for name, params in test_block.items():
-        logger.info("Starting {0} tests".format(name))
-        test_num = test_number_per_type.get(name, 0)
-        test_number_per_type[name] = test_num + 1
-        threads = []
-        barrier = utils.Barrier(len(test_nodes))
-        coord_q = Queue.Queue()
-        test_cls = tool_type_mapper[name]
-        rem_folder = cfg['default_test_local_folder'].format(name=name)
-
-        for idx, node in enumerate(test_nodes):
-            msg = "Starting {0} test on {1} node"
-            logger.debug(msg.format(name, node.conn_url))
-
-            dr = os.path.join(
-                    cfg_dict['test_log_directory'],
-                    "{0}_{1}_{2}".format(name, test_num, node.get_ip())
-                )
-
-            if not os.path.exists(dr):
-                os.makedirs(dr)
-
-            test = test_cls(options=params,
-                            is_primary=(idx == 0),
-                            on_result_cb=res_q.put,
-                            test_uuid=cfg['run_uuid'],
-                            node=node,
-                            remote_dir=rem_folder,
-                            log_directory=dr,
-                            coordination_queue=coord_q)
-            th = threading.Thread(None, test_thread, None,
-                                  (test, node, barrier, res_q))
-            threads.append(th)
-            th.daemon = True
-            th.start()
-
-        th = threading.Thread(None, test_cls.coordination_th, None,
-                              (coord_q, barrier, len(threads)))
-        threads.append(th)
-        th.daemon = True
-        th.start()
-
-        def gather_results(res_q, results):
-            while not res_q.empty():
-                val = res_q.get()
-
-                if isinstance(val, utils.StopTestError):
-                    raise val
-
-                if isinstance(val, Exception):
-                    msg = "Exception during test execution: {0!s}"
-                    raise ValueError(msg.format(val))
-
-                results.append(val)
-
         results = []
+        limit = params.get('node_limit')
+        if isinstance(limit, (int, long)):
+            vm_limits = [limit]
+        elif limit is None:
+            vm_limits = [len(test_nodes)]
+        else:
+            vm_limits = limit
 
-        # MAX_WAIT_TIME = 10
-        # end_time = time.time() + MAX_WAIT_TIME
+        for vm_count in vm_limits:
+            if vm_count == 'all':
+                curr_test_nodes = test_nodes
+                unused_nodes = []
+            else:
+                curr_test_nodes = test_nodes[:vm_count]
+                unused_nodes = test_nodes[vm_count:]
 
-        # while time.time() < end_time:
-        while True:
-            for th in threads:
-                th.join(1)
-                gather_results(res_q, results)
-                # if time.time() > end_time:
-                #     break
+            if 0 == len(curr_test_nodes):
+                continue
 
-            if all(not th.is_alive() for th in threads):
-                break
+            if cfg.get('suspend_unused_vms', True):
+                pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
+                                      if node.os_vm_id is not None]
+                non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
 
-        # if any(th.is_alive() for th in threads):
-        #     logger.warning("Some test threads still running")
+                if 0 != non_pausable:
+                    logger.warning("Can't pause {0} nodes".format(
+                                   non_pausable))
 
-        gather_results(res_q, results)
-        result = test_cls.merge_results(results)
-        result['__test_meta__'] = {'testnodes_count': len(test_nodes)}
-        yield name, result
+                if len(pausable_nodes_ids) != 0:
+                    logger.debug("Try to pause {0} unused nodes".format(
+                                 len(pausable_nodes_ids)))
+                    start_vms.pause(pausable_nodes_ids)
+
+            resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
+                                   if node.os_vm_id is not None]
+
+            if len(resumable_nodes_ids) != 0:
+                logger.debug("Check and unpause {0} nodes".format(
+                             len(resumable_nodes_ids)))
+                start_vms.unpause(resumable_nodes_ids)
+
+            test_cls = tool_type_mapper[name]
+            try:
+                res = run_single_test(curr_test_nodes, name, test_cls,
+                                      params,
+                                      cfg['default_test_local_folder'],
+                                      cfg['run_uuid'])
+            finally:
+                if cfg.get('suspend_unused_vms', True):
+                    if len(pausable_nodes_ids) != 0:
+                        logger.debug("Unpausing {0} nodes".format(
+                                     len(pausable_nodes_ids)))
+                        start_vms.unpause(pausable_nodes_ids)
+
+            results.append(res)
+
+        yield name, results
 
 
 def log_nodes_statistic(_, ctx):
@@ -306,13 +358,22 @@
         fd.write(pretty_yaml.dumps(cluster))
 
 
-def reuse_vms_stage(vm_name_pattern, conn_pattern):
-    def reuse_vms(cfg, ctx):
+def reuse_vms_stage(cfg, ctx):
+    p = cfg.get('clouds', {})
+    p = p.get('openstack', {})
+    p = p.get('vms', [])
+
+    for creds in p:
+        vm_name_pattern, conn_pattern = creds.split(",")
         try:
             msg = "Looking for vm with name like {0}".format(vm_name_pattern)
             logger.debug(msg)
 
-            os_creds = get_OS_credentials(cfg, ctx, "clouds")
+            if not start_vms.is_connected():
+                os_creds = get_OS_credentials(cfg, ctx)
+            else:
+                os_creds = {}
+
             conn = start_vms.nova_connect(**os_creds)
             for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
                 node = Node(conn_pattern.format(ip=ip), ['testnode'])
@@ -325,56 +386,58 @@
             logger.exception(msg)
             raise utils.StopTestError(msg, exc)
 
-    return reuse_vms
+
+def get_creds_openrc(path):
+    fc = open(path).read()
+
+    echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
+
+    try:
+        data = utils.run_locally(['/bin/bash'],
+                                 input_data=fc + "\n" + echo)
+    except subprocess.CalledProcessError as exc:
+        msg = "Failed to get creads from openrc file: " + data
+        logger.exception(msg)
+        raise utils.StopTestError(msg, exc)
+
+    try:
+        data = data.strip()
+        user, tenant, passwd_auth_url = data.split(':', 2)
+        passwd, auth_url = passwd_auth_url.rsplit("@", 1)
+        assert (auth_url.startswith("https://") or
+                auth_url.startswith("http://"))
+    except Exception as exc:
+        msg = "Failed to get creads from openrc file: " + data
+        logger.exception(msg)
+        raise utils.StopTestError(msg, exc)
+    return user, passwd, tenant, auth_url
 
 
-def get_OS_credentials(cfg, ctx, creds_type):
+def get_OS_credentials(cfg, ctx):
     creds = None
-
-    if creds_type == 'clouds':
-        logger.info("Using OS credentials from 'cloud' section")
-        if 'openstack' in cfg['clouds']:
-            os_cfg = cfg['clouds']['openstack']
-
+    if 'openstack' in cfg['clouds']:
+        os_cfg = cfg['clouds']['openstack']
+        if 'OPENRC' in os_cfg:
+            logger.info("Using OS credentials from " + os_cfg['OPENRC'])
+            user, passwd, tenant, auth_url = \
+                get_creds_openrc(os_cfg['OPENRC'])
+        elif 'ENV' in os_cfg:
+            logger.info("Using OS credentials from shell environment")
+            user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+        else:
+            logger.info("Using predefined credentials")
             tenant = os_cfg['OS_TENANT_NAME'].strip()
             user = os_cfg['OS_USERNAME'].strip()
             passwd = os_cfg['OS_PASSWORD'].strip()
             auth_url = os_cfg['OS_AUTH_URL'].strip()
 
-        elif 'fuel' in cfg['clouds'] and \
-             'openstack_env' in cfg['clouds']['fuel']:
-            creds = ctx.fuel_openstack_creds
-
-    elif creds_type == 'ENV':
-        logger.info("Using OS credentials from shell environment")
-        user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
-    elif os.path.isfile(creds_type):
-        logger.info("Using OS credentials from " + creds_type)
-        fc = open(creds_type).read()
-
-        echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
-        try:
-            data = utils.run_locally(['/bin/bash'], input=fc + "\n" + echo)
-        except subprocess.CalledProcessError as exc:
-            msg = "Failed to get creads from openrc file: " + data
-            logger.exception(msg)
-            raise utils.StopTestError(msg, exc)
-
-        try:
-            user, tenant, passwd_auth_url = data.split(':', 2)
-            passwd, auth_url = passwd_auth_url.rsplit("@", 1)
-            assert (auth_url.startswith("https://") or
-                    auth_url.startswith("http://"))
-        except Exception as exc:
-            msg = "Failed to get creads from openrc file: " + data
-            logger.exception(msg)
-            raise utils.StopTestError(msg, exc)
-
+    elif 'fuel' in cfg['clouds'] and \
+         'openstack_env' in cfg['clouds']['fuel']:
+        logger.info("Using fuel creds")
+        creds = ctx.fuel_openstack_creds
     else:
-        msg = "Creds {0!r} isn't supported".format(creds_type)
-        logger.error(msg)
-        raise utils.StopTestError(msg, None)
+        logger.error("Can't found OS credentials")
+        raise utils.StopTestError("Can't found OS credentials", None)
 
     if creds is None:
         creds = {'name': user,
@@ -392,16 +455,18 @@
     params = cfg['vm_configs'][config['cfg_name']].copy()
     os_nodes_ids = []
 
-    os_creds_type = config['creds']
-    os_creds = get_OS_credentials(cfg, ctx, os_creds_type)
+    if not start_vms.is_connected():
+        os_creds = get_OS_credentials(cfg, ctx)
+    else:
+        os_creds = {}
     start_vms.nova_connect(**os_creds)
 
-    logger.info("Preparing openstack")
     params.update(config)
     params['keypair_file_private'] = params['keypair_name'] + ".pem"
     params['group_name'] = cfg_dict['run_uuid']
 
     if not config.get('skip_preparation', False):
+        logger.info("Preparing openstack")
         start_vms.prepare_os_subpr(params=params, **os_creds)
 
     new_nodes = []
@@ -446,26 +511,25 @@
             vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
                                     num_test_nodes)
             with vm_ctx as new_nodes:
-                connect_all(new_nodes, True)
+                if len(new_nodes) != 0:
+                    connect_all(new_nodes, True)
 
-                for node in new_nodes:
-                    if node.connection is None:
-                        msg = "Failed to connect to vm {0}"
-                        raise RuntimeError(msg.format(node.get_conn_id()))
+                    for node in new_nodes:
+                        if node.connection is None:
+                            msg = "Failed to connect to vm {0}"
+                            raise RuntimeError(msg.format(node.get_conn_id()))
 
-                deploy_sensors_stage(cfg_dict,
-                                     ctx,
-                                     nodes=new_nodes,
-                                     undeploy=False)
+                    deploy_sensors_stage(cfg_dict,
+                                         ctx,
+                                         nodes=new_nodes,
+                                         undeploy=False)
 
-                if not cfg['no_tests']:
-                    for test_group in config.get('tests', []):
-                        test_res = run_tests(cfg, test_group, ctx.nodes)
-                        ctx.results.extend(test_res)
+                for test_group in config.get('tests', []):
+                    test_res = run_tests(cfg, test_group, ctx.nodes)
+                    ctx.results.extend(test_res)
         else:
-            if not cfg['no_tests']:
-                test_res = run_tests(cfg, group, ctx.nodes)
-                ctx.results.extend(test_res)
+            test_res = run_tests(cfg, group, ctx.nodes)
+            ctx.results.extend(test_res)
 
 
 def shut_down_vms_stage(cfg, ctx):
@@ -522,9 +586,19 @@
     for tp, data in ctx.results:
         if 'io' == tp and data is not None:
             dinfo = report.process_disk_info(data)
+            text_rep_fname = cfg['text_report_file']
+            rep = IOPerfTest.format_for_console(data, dinfo)
+
+            with open(text_rep_fname, "w") as fd:
+                fd.write(rep)
+                fd.write("\n")
+                fd.flush()
+
+            logger.info("Text report were stored in " + text_rep_fname)
             print("\n")
             print(IOPerfTest.format_for_console(data, dinfo))
             print("\n")
+
         if tp in ['mysql', 'pgbench'] and data is not None:
             print("\n")
             print(MysqlTest.format_for_console(data))
@@ -547,14 +621,6 @@
                                   cfg['charts_img_path'],
                                   lab_info=ctx.hw_info)
 
-            text_rep_fname = cfg_dict['text_report_file']
-            with open(text_rep_fname, "w") as fd:
-                fd.write(IOPerfTest.format_for_console(data, dinfo))
-                fd.write("\n")
-                fd.flush()
-
-            logger.info("Text report were stored in " + text_rep_fname)
-
 
 def complete_log_nodes_statistic(cfg, ctx):
     nodes = ctx.nodes
@@ -610,13 +676,19 @@
                         help="Skip html report", default=False)
     parser.add_argument("--params", metavar="testname.paramname",
                         help="Test params", default=[])
-    parser.add_argument("--reuse-vms", default=[], nargs='*')
     parser.add_argument("config_file")
 
     return parser.parse_args(argv[1:])
 
 
+# from plop.collector import Collector
+
+
 def main(argv):
+    # collector = Collector()
+    # collector.start()
+
+    faulthandler.register(signal.SIGUSR1, all_threads=True)
     opts = parse_args(argv)
     load_config(opts.config_file, opts.post_process_only)
 
@@ -629,11 +701,8 @@
             discover_stage
         ]
 
-        for reuse_param in opts.reuse_vms:
-            pref, ssh_templ = reuse_param.split(',', 1)
-            stages.append(reuse_vms_stage(pref, ssh_templ))
-
         stages.extend([
+            reuse_vms_stage,
             log_nodes_statistic,
             save_nodes_stage,
             connect_stage])
@@ -644,7 +713,8 @@
         stages.extend([
             deploy_sensors_stage,
             run_tests_stage,
-            store_raw_results_stage
+            store_raw_results_stage,
+            gather_sensors_stage
         ])
 
     report_stages = [
@@ -683,7 +753,10 @@
 
     try:
         for stage in stages:
-            logger.info("Start {0.__name__} stage".format(stage))
+            if stage.__name__.endswith("stage"):
+                logger.info("Start {0.__name__}".format(stage))
+            else:
+                logger.info("Start {0.__name__} stage".format(stage))
             stage(cfg_dict, ctx)
     except utils.StopTestError as exc:
         logger.error(msg_templ.format(stage, exc))
@@ -693,7 +766,10 @@
         exc, cls, tb = sys.exc_info()
         for stage in ctx.clear_calls_stack[::-1]:
             try:
-                logger.info("Start {0.__name__} stage".format(stage))
+                if stage.__name__.endswith("stage"):
+                    logger.info("Start {0.__name__}".format(stage))
+                else:
+                    logger.info("Start {0.__name__} stage".format(stage))
                 stage(cfg_dict, ctx)
             except utils.StopTestError as cleanup_exc:
                 logger.error(msg_templ.format(stage, cleanup_exc))
@@ -718,6 +794,9 @@
     if cfg_dict.get('run_web_ui', False):
         stop_web_ui(cfg_dict, ctx)
 
+    # collector.stop()
+    # open("plop.out", "w").write(repr(dict(collector.stack_counts)))
+
     if exc is None:
         logger.info("Tests finished successfully")
         return 0