import re
import time
import json
import stat
import random
import shutil
import os.path
import logging
import datetime
import functools
import subprocess
import collections

import yaml
import paramiko
import texttable
from paramiko.ssh_exception import SSHException
from concurrent.futures import ThreadPoolExecutor, wait

import wally
from wally.pretty_yaml import dumps
from wally.statistic import round_3_digit, data_property, average
from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)

from .fio_task_parser import (execution_time, fio_cfg_compile,
                              get_test_summary, get_test_summary_tuple,
                              get_test_sync_mode, FioJobSection)

from ..itest import (TimeSeriesValue, PerfTest, TestResults,
                     run_on_node, TestConfig, MeasurementMatrix)

logger = logging.getLogger("wally")


# Results folder structure
# results/
#     {loadtype}_{num}/
#         config.yaml
#         ......


class NoData(object):
    pass


def cached_prop(func):
    @property
    @functools.wraps(func)
    def closure(self):
        val = getattr(self, "_" + func.__name__)
        if val is NoData:
            val = func(self)
            setattr(self, "_" + func.__name__, val)
        return val
    return closure


def load_fio_log_file(fname):
    with open(fname) as fd:
        it = [ln.split(',')[:2] for ln in fd]

    vals = [(float(off) / 1000,  # convert us to ms
             float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
                                        # as fio trimm all values in log to integer
            for off, val in it]

    return TimeSeriesValue(vals)


READ_IOPS_DISCSTAT_POS = 3
WRITE_IOPS_DISCSTAT_POS = 7


def load_sys_log_file(ftype, fname):
    assert ftype == 'iops'
    pval = None
    with open(fname) as fd:
        iops = []
        for ln in fd:
            params = ln.split()
            cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
                int(params[READ_IOPS_DISCSTAT_POS])
            if pval is not None:
                iops.append(cval - pval)
            pval = cval

    vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
    return TimeSeriesValue(vals)


def load_test_results(folder, run_num):
    res = {}
    params = None

    fn = os.path.join(folder, str(run_num) + '_params.yaml')
    params = yaml.load(open(fn).read())

    conn_ids_set = set()
    rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
    for fname in os.listdir(folder):
        rm = re.match(rr, fname)
        if rm is None:
            continue

        conn_id_s = rm.group('conn_id')
        conn_id = conn_id_s.replace('_', ':')
        ftype = rm.group('type')

        if ftype not in ('iops', 'bw', 'lat'):
            continue

        ts = load_fio_log_file(os.path.join(folder, fname))
        res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)

        conn_ids_set.add(conn_id)

    rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
    for fname in os.listdir(folder):
        rm = re.match(rr, fname)
        if rm is None:
            continue

        conn_id_s = rm.group('conn_id')
        conn_id = conn_id_s.replace('_', ':')
        ftype = rm.group('type')

        if ftype not in ('iops', 'bw', 'lat'):
            continue

        ts = load_sys_log_file(ftype, os.path.join(folder, fname))
        res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)

        conn_ids_set.add(conn_id)

    mm_res = {}

    if len(res) == 0:
        raise ValueError("No data was found")

    for key, data in res.items():
        conn_ids = sorted(conn_ids_set)
        awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
        matr = [data[conn_id] for conn_id in awail_ids]
        mm_res[key] = MeasurementMatrix(matr, awail_ids)

    raw_res = {}
    for conn_id in conn_ids:
        fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))

        # remove message hack
        fc = "{" + open(fn).read().split('{', 1)[1]
        raw_res[conn_id] = json.loads(fc)

    fio_task = FioJobSection(params['name'])
    fio_task.vals.update(params['vals'])

    config = TestConfig('io', params, None, params['nodes'], folder, None)
    return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)


class Attrmapper(object):
    def __init__(self, dct):
        self.__dct = dct

    def __getattr__(self, name):
        try:
            return self.__dct[name]
        except KeyError:
            raise AttributeError(name)


class DiskPerfInfo(object):
    def __init__(self, name, summary, params, testnodes_count):
        self.name = name
        self.bw = None
        self.iops = None
        self.lat = None
        self.lat_50 = None
        self.lat_95 = None
        self.lat_avg = None

        self.raw_bw = []
        self.raw_iops = []
        self.raw_lat = []

        self.params = params
        self.testnodes_count = testnodes_count
        self.summary = summary
        self.p = Attrmapper(self.params['vals'])

        self.sync_mode = get_test_sync_mode(self.params['vals'])
        self.concurence = self.params['vals'].get('numjobs', 1)


def get_lat_perc_50_95(lat_mks):
    curr_perc = 0
    perc_50 = None
    perc_95 = None
    pkey = None
    for key, val in sorted(lat_mks.items()):
        if curr_perc + val >= 50 and perc_50 is None:
            if pkey is None or val < 1.:
                perc_50 = key
            else:
                perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey

        if curr_perc + val >= 95:
            if pkey is None or val < 1.:
                perc_95 = key
            else:
                perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
            break

        pkey = key
        curr_perc += val

    # for k, v in sorted(lat_mks.items()):
    #     if k / 1000 > 0:
    #         print "{0:>4}".format(k / 1000), v

    # print perc_50 / 1000., perc_95 / 1000.
    # exit(1)
    return perc_50 / 1000., perc_95 / 1000.


class IOTestResults(object):
    def __init__(self, suite_name, fio_results, log_directory):
        self.suite_name = suite_name
        self.fio_results = fio_results
        self.log_directory = log_directory

    def __iter__(self):
        return iter(self.fio_results)

    def __len__(self):
        return len(self.fio_results)

    def get_yamable(self):
        items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
        return {self.suite_name: [self.log_directory] + items}


class FioRunResult(TestResults):
    """
    Fio run results
    config: TestConfig
    fio_task: FioJobSection
    ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
    raw_result: ????
    run_interval:(float, float) - test tun time, used for sensors
    """
    def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):

        self.name = fio_task.name.rsplit("_", 1)[0]
        self.fio_task = fio_task
        self.idx = idx

        self.bw = ts_results['bw']
        self.lat = ts_results['lat']
        self.iops = ts_results['iops']

        if 'iops:sys' in ts_results:
            self.iops_sys = ts_results['iops:sys']
        else:
            self.iops_sys = None

        res = {"bw": self.bw,
               "lat": self.lat,
               "iops": self.iops,
               "iops:sys": self.iops_sys}

        self.sensors_data = None
        self._pinfo = None
        TestResults.__init__(self, config, res, raw_result, run_interval)

    def get_params_from_fio_report(self):
        nodes = self.bw.connections_ids

        iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
        total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
        runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
        flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]

        bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
        total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
        flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]

        return {'iops': iops,
                'flt_iops': flt_iops,
                'bw': bw,
                'flt_bw': flt_bw}

    def summary(self):
        return get_test_summary(self.fio_task, len(self.config.nodes))

    def summary_tpl(self):
        return get_test_summary_tuple(self.fio_task, len(self.config.nodes))

    def get_lat_perc_50_95_multy(self):
        lat_mks = collections.defaultdict(lambda: 0)
        num_res = 0

        for result in self.raw_result.values():
            num_res += len(result['jobs'])
            for job_info in result['jobs']:
                for k, v in job_info['latency_ms'].items():
                    if isinstance(k, basestring) and k.startswith('>='):
                        lat_mks[int(k[2:]) * 1000] += v
                    else:
                        lat_mks[int(k) * 1000] += v

                for k, v in job_info['latency_us'].items():
                    lat_mks[int(k)] += v

        for k, v in lat_mks.items():
            lat_mks[k] = float(v) / num_res
        return get_lat_perc_50_95(lat_mks)

    def disk_perf_info(self, avg_interval=2.0):

        if self._pinfo is not None:
            return self._pinfo

        testnodes_count = len(self.config.nodes)

        pinfo = DiskPerfInfo(self.name,
                             self.summary(),
                             self.params,
                             testnodes_count)

        def prepare(data, drop=1):
            if data is None:
                return data

            res = []
            for ts_data in data:
                if ts_data.average_interval() < avg_interval:
                    ts_data = ts_data.derived(avg_interval)

                # drop last value on bounds
                # as they may contains ranges without activities
                assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)

                if drop > 0:
                    res.append(ts_data.values[:-drop])
                else:
                    res.append(ts_data.values)

            return res

        def agg_data(matr):
            arr = sum(matr, [])
            min_len = min(map(len, arr))
            res = []
            for idx in range(min_len):
                res.append(sum(dt[idx] for dt in arr))
            return res

        pinfo.raw_lat = map(prepare, self.lat.per_vm())
        num_th = sum(map(len, pinfo.raw_lat))
        lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
        pinfo.lat_avg = data_property(lat_avg).average / 1000  # us to ms

        pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
        pinfo.lat = pinfo.lat_50

        pinfo.raw_bw = map(prepare, self.bw.per_vm())
        pinfo.raw_iops = map(prepare, self.iops.per_vm())

        if self.iops_sys is not None:
            pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
            pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
        else:
            pinfo.raw_iops_sys = None
            pinfo.iops_sys = None

        fparams = self.get_params_from_fio_report()
        fio_report_bw = sum(fparams['flt_bw'])
        fio_report_iops = sum(fparams['flt_iops'])

        agg_bw = agg_data(pinfo.raw_bw)
        agg_iops = agg_data(pinfo.raw_iops)

        log_bw_avg = average(agg_bw)
        log_iops_avg = average(agg_iops)

        # update values to match average from fio report
        coef_iops = fio_report_iops / float(log_iops_avg)
        coef_bw = fio_report_bw / float(log_bw_avg)

        bw_log = data_property([val * coef_bw for val in agg_bw])
        iops_log = data_property([val * coef_iops for val in agg_iops])

        bw_report = data_property([fio_report_bw])
        iops_report = data_property([fio_report_iops])

        # When IOPS/BW per thread is too low
        # data from logs is rounded to match
        iops_per_th = sum(sum(pinfo.raw_iops, []), [])
        if average(iops_per_th) > 10:
            pinfo.iops = iops_log
            pinfo.iops2 = iops_report
        else:
            pinfo.iops = iops_report
            pinfo.iops2 = iops_log

        bw_per_th = sum(sum(pinfo.raw_bw, []), [])
        if average(bw_per_th) > 10:
            pinfo.bw = bw_log
            pinfo.bw2 = bw_report
        else:
            pinfo.bw = bw_report
            pinfo.bw2 = bw_log

        self._pinfo = pinfo

        return pinfo


class IOPerfTest(PerfTest):
    tcp_conn_timeout = 30
    max_pig_timeout = 5
    soft_runcycle = 5 * 60
    retry_time = 30

    def __init__(self, config):
        PerfTest.__init__(self, config)

        get = self.config.params.get
        do_get = self.config.params.__getitem__

        self.config_fname = do_get('cfg')

        if '/' not in self.config_fname and '.' not in self.config_fname:
            cfgs_dir = os.path.dirname(__file__)
            self.config_fname = os.path.join(cfgs_dir,
                                             self.config_fname + '.cfg')

        self.alive_check_interval = get('alive_check_interval')
        self.use_system_fio = get('use_system_fio', False)

        if get('prefill_files') is not None:
            logger.warning("prefill_files option is depricated. Use force_prefill instead")

        self.force_prefill = get('force_prefill', False)
        self.config_params = get('params', {}).copy()

        self.io_py_remote = self.join_remote("agent.py")
        self.results_file = self.join_remote("results.json")
        self.pid_file = self.join_remote("pid")
        self.task_file = self.join_remote("task.cfg")
        self.sh_file = self.join_remote("cmd.sh")
        self.err_out_file = self.join_remote("fio_err_out")
        self.io_log_file = self.join_remote("io_log.txt")
        self.exit_code_file = self.join_remote("exit_code")

        self.max_latency = get("max_lat", None)
        self.min_bw_per_thread = get("min_bw", None)

        self.use_sudo = get("use_sudo", True)

        self.raw_cfg = open(self.config_fname).read()
        self.fio_configs = None

    @classmethod
    def load(cls, suite_name, folder):
        res = []
        for fname in os.listdir(folder):
            if re.match("\d+_params.yaml$", fname):
                num = int(fname.split('_')[0])
                res.append(load_test_results(folder, num))
        return IOTestResults(suite_name, res, folder)

    def cleanup(self):
        # delete_file(conn, self.io_py_remote)
        # Need to remove tempo files, used for testing
        pass

    # size is megabytes
    def check_prefill_required(self, rossh, fname, size, num_blocks=16):
        try:
            with rossh.connection.open_sftp() as sftp:
                fstats = sftp.stat(fname)

            if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
                return True
        except EnvironmentError:
            return True

        cmd = 'python -c "' + \
              "import sys;" + \
              "fd = open('{0}', 'rb');" + \
              "fd.seek({1});" + \
              "data = fd.read(1024); " + \
              "sys.stdout.write(data + ' ' * ( 1024 - len(data)))\" | md5sum"

        if self.use_sudo:
            cmd = "sudo " + cmd

        zero_md5 = '0f343b0931126a20f133d67c2b018a3b'
        bsize = size * (1024 ** 2)
        offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
        offsets.append(bsize - 1024)
        offsets.append(0)

        for offset in offsets:
            data = rossh(cmd.format(fname, offset), nolog=True)

            md = ""
            for line in data.split("\n"):
                if "unable to resolve" not in line:
                    md = line.split()[0].strip()
                    break

            if len(md) != 32:
                logger.error("File data check is failed - " + data)
                return True

            if zero_md5 == md:
                return True

        return False

    def prefill_test_files(self, rossh, files, force=False):
        if self.use_system_fio:
            cmd_templ = "fio "
        else:
            cmd_templ = "{0}/fio ".format(self.config.remote_dir)

        if self.use_sudo:
            cmd_templ = "sudo " + cmd_templ

        cmd_templ += "--name=xxx --filename={0} --direct=1" + \
                     " --bs=4m --size={1}m --rw=write"

        ssize = 0

        if force:
            logger.info("File prefilling is forced")

        ddtime = 0
        for fname, curr_sz in files.items():
            if not force:
                if not self.check_prefill_required(rossh, fname, curr_sz):
                    logger.debug("prefill is skipped")
                    continue

            logger.info("Prefilling file {0}".format(fname))
            cmd = cmd_templ.format(fname, curr_sz)
            ssize += curr_sz

            stime = time.time()
            rossh(cmd, timeout=curr_sz)
            ddtime += time.time() - stime

        if ddtime > 1.0:
            fill_bw = int(ssize / ddtime)
            mess = "Initiall fio fill bw is {0} MiBps for this vm"
            logger.info(mess.format(fill_bw))

    def install_utils(self, node, rossh, max_retry=3, timeout=5):
        need_install = []
        packs = [('screen', 'screen')]
        os_info = get_os(rossh)

        if self.use_system_fio:
            packs.append(('fio', 'fio'))
        else:
            packs.append(('bzip2', 'bzip2'))

        for bin_name, package in packs:
            if bin_name is None:
                need_install.append(package)
                continue

            try:
                rossh('which ' + bin_name, nolog=True)
            except OSError:
                need_install.append(package)

        if len(need_install) != 0:
            if 'redhat' == os_info.distro:
                cmd = "sudo yum -y install " + " ".join(need_install)
            else:
                cmd = "sudo apt-get -y install " + " ".join(need_install)

            for _ in range(max_retry):
                try:
                    rossh(cmd)
                    break
                except OSError as err:
                    time.sleep(timeout)
            else:
                raise OSError("Can't install - " + str(err))

        if not self.use_system_fio:
            fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
            fio_dir = os.path.join(os.getcwd(), fio_dir)
            fio_dir = os.path.join(fio_dir, 'fio_binaries')
            fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
            fio_path = os.path.join(fio_dir, fname)

            if not os.path.exists(fio_path):
                raise RuntimeError("No prebuild fio available for {0}".format(os_info))

            bz_dest = self.join_remote('fio.bz2')
            with node.connection.open_sftp() as sftp:
                sftp.put(fio_path, bz_dest)

            rossh("bzip2 --decompress " + bz_dest, nolog=True)
            rossh("chmod a+x " + self.join_remote("fio"), nolog=True)

    def pre_run(self):
        if 'FILESIZE' not in self.config_params:
            # need to detect file size
            pass

        self.fio_configs = fio_cfg_compile(self.raw_cfg,
                                           self.config_fname,
                                           self.config_params)
        self.fio_configs = list(self.fio_configs)

        files = {}
        for section in self.fio_configs:
            sz = ssize2b(section.vals['size'])
            msz = sz / (1024 ** 2)

            if sz % (1024 ** 2) != 0:
                msz += 1

            fname = section.vals['filename']

            # if already has other test with the same file name
            # take largest size
            files[fname] = max(files.get(fname, 0), msz)

        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
            fc = functools.partial(self.pre_run_th,
                                   files=files,
                                   force=self.force_prefill)
            list(pool.map(fc, self.config.nodes))

    def pre_run_th(self, node, files, force):
        try:
            # fill files with pseudo-random data
            rossh = run_on_node(node)
            rossh.connection = node.connection

            try:
                cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
                if self.use_sudo:
                    cmd = "sudo " + cmd
                    cmd += " ; sudo chown {0} {1}".format(node.get_user(),
                                                          self.config.remote_dir)
                rossh(cmd, nolog=True)

                assert self.config.remote_dir != "" and self.config.remote_dir != "/"
                rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)

            except Exception as exc:
                msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
                msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
                logger.exception(msg)
                raise StopTestError(msg, exc)

            self.install_utils(node, rossh)
            self.prefill_test_files(rossh, files, force)
        except:
            logger.exception("XXXX")
            raise

    def show_test_execution_time(self):
        if len(self.fio_configs) > 1:
            # +10% - is a rough estimation for additional operations
            # like sftp, etc
            exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
            exec_time_s = sec_to_str(exec_time)
            now_dt = datetime.datetime.now()
            end_dt = now_dt + datetime.timedelta(0, exec_time)
            msg = "Entire test should takes aroud: {0} and finished at {1}"
            logger.info(msg.format(exec_time_s,
                                   end_dt.strftime("%H:%M:%S")))

    def run(self):
        logger.debug("Run preparation")
        self.pre_run()
        self.show_test_execution_time()

        tname = os.path.basename(self.config_fname)
        if tname.endswith('.cfg'):
            tname = tname[:-4]

        barrier = Barrier(len(self.config.nodes))
        results = []

        # set of Operation_Mode_BlockSize str's
        # which should not be tested anymore, as
        # they already too slow with previous thread count
        lat_bw_limit_reached = set()

        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
            for pos, fio_cfg in enumerate(self.fio_configs):
                test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
                if test_descr in lat_bw_limit_reached:
                    continue
                else:
                    logger.info("Will run {0} test".format(fio_cfg.name))

                templ = "Test should takes about {0}." + \
                        " Should finish at {1}," + \
                        " will wait at most till {2}"
                exec_time = execution_time(fio_cfg)
                exec_time_str = sec_to_str(exec_time)
                timeout = int(exec_time + max(300, exec_time))

                now_dt = datetime.datetime.now()
                end_dt = now_dt + datetime.timedelta(0, exec_time)
                wait_till = now_dt + datetime.timedelta(0, timeout)

                logger.info(templ.format(exec_time_str,
                                         end_dt.strftime("%H:%M:%S"),
                                         wait_till.strftime("%H:%M:%S")))

                func = functools.partial(self.do_run,
                                         barrier=barrier,
                                         fio_cfg=fio_cfg,
                                         pos=pos)

                max_retr = 3
                for idx in range(max_retr):
                    try:
                        intervals = list(pool.map(func, self.config.nodes))
                        if None not in intervals:
                            break
                    except (EnvironmentError, SSHException) as exc:
                        logger.exception("During fio run")
                        if idx == max_retr - 1:
                            raise StopTestError("Fio failed", exc)

                    logger.info("Reconnectiong, sleeping %ss and retrying", self.retry_time)

                    wait([pool.submit(node.connection.close)
                          for node in self.config.nodes])

                    time.sleep(self.retry_time)

                    wait([pool.submit(reconnect, node.connection, node.conn_url)
                             for node in self.config.nodes])

                fname = "{0}_task.fio".format(pos)
                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
                    fd.write(str(fio_cfg))

                params = {'vm_count': len(self.config.nodes)}
                params['name'] = fio_cfg.name
                params['vals'] = dict(fio_cfg.vals.items())
                params['intervals'] = intervals
                params['nodes'] = [node.get_conn_id() for node in self.config.nodes]

                fname = "{0}_params.yaml".format(pos)
                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
                    fd.write(dumps(params))

                res = load_test_results(self.config.log_directory, pos)
                results.append(res)

                if self.max_latency is not None:
                    lat_50, _ = res.get_lat_perc_50_95_multy()

                    # conver us to ms
                    if self.max_latency < lat_50:
                        logger.info(("Will skip all subsequent tests of {0} " +
                                     "due to lat/bw limits").format(fio_cfg.name))
                        lat_bw_limit_reached.add(test_descr)

                test_res = res.get_params_from_fio_report()
                if self.min_bw_per_thread is not None:
                    if self.min_bw_per_thread > average(test_res['bw']):
                        lat_bw_limit_reached.add(test_descr)

        return IOTestResults(self.config.params['cfg'],
                             results, self.config.log_directory)

    def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
        if self.use_sudo:
            sudo = "sudo "
        else:
            sudo = ""

        bash_file = """
#!/bin/bash

function get_dev() {{
    if [ -b "$1" ] ; then
        echo $1
    else
        echo $(df "$1" | tail -1 | awk '{{print $1}}')
    fi
}}

function log_io_activiti(){{
    local dest="$1"
    local dev=$(get_dev "$2")
    local sleep_time="$3"
    dev=$(basename "$dev")

    echo $dev

    for (( ; ; )) ; do
        grep -E "\\b$dev\\b" /proc/diskstats >> "$dest"
        sleep $sleep_time
    done
}}

sync
cd {exec_folder}

log_io_activiti {io_log_file} {test_file} 1 &
local pid="$!"

{fio_path}fio --output-format=json --output={out_file} --alloc-size=262144 {job_file} >{err_out_file} 2>&1
echo $? >{res_code_file}
kill -9 $pid

"""

        exec_folder = self.config.remote_dir

        if self.use_system_fio:
            fio_path = ""
        else:
            if not exec_folder.endswith("/"):
                fio_path = exec_folder + "/"
            else:
                fio_path = exec_folder

        bash_file = bash_file.format(out_file=self.results_file,
                                     job_file=self.task_file,
                                     err_out_file=self.err_out_file,
                                     res_code_file=self.exit_code_file,
                                     exec_folder=exec_folder,
                                     fio_path=fio_path,
                                     test_file=self.config_params['FILENAME'],
                                     io_log_file=self.io_log_file).strip()

        with node.connection.open_sftp() as sftp:
            save_to_remote(sftp, self.task_file, str(fio_cfg))
            save_to_remote(sftp, self.sh_file, bash_file)

        exec_time = execution_time(fio_cfg)

        timeout = int(exec_time + max(300, exec_time))
        soft_tout = exec_time

        begin = time.time()

        fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)

        barrier.wait()

        task = BGSSHTask(node, self.use_sudo)
        task.start(sudo + "bash " + self.sh_file)

        while True:
            try:
                task.wait(soft_tout, timeout)
                break
            except paramiko.SSHException:
                pass

            try:
                node.connection.close()
            except:
                pass

            reconnect(node.connection, node.conn_url)

        end = time.time()
        rossh = run_on_node(node)
        fnames_after = rossh("ls -1 " + exec_folder, nolog=True)

        conn_id = node.get_conn_id().replace(":", "_")
        if not nolog:
            logger.debug("Test on node {0} is finished".format(conn_id))

        log_files_pref = []
        if 'write_lat_log' in fio_cfg.vals:
            fname = fio_cfg.vals['write_lat_log']
            log_files_pref.append(fname + '_clat')
            log_files_pref.append(fname + '_lat')
            log_files_pref.append(fname + '_slat')

        if 'write_iops_log' in fio_cfg.vals:
            fname = fio_cfg.vals['write_iops_log']
            log_files_pref.append(fname + '_iops')

        if 'write_bw_log' in fio_cfg.vals:
            fname = fio_cfg.vals['write_bw_log']
            log_files_pref.append(fname + '_bw')

        files = collections.defaultdict(lambda: [])
        all_files = [os.path.basename(self.results_file)]
        new_files = set(fnames_after.split()) - set(fnames_before.split())

        for fname in new_files:
            if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
                name, _ = os.path.splitext(fname)
                if fname.count('.') == 1:
                    tp = name.split("_")[-1]
                    cnt = 0
                else:
                    tp_cnt = name.split("_")[-1]
                    tp, cnt = tp_cnt.split('.')
                files[tp].append((int(cnt), fname))
                all_files.append(fname)
            elif fname == os.path.basename(self.io_log_file):
                files['iops'].append(('sys', fname))
                all_files.append(fname)

        arch_name = self.join_remote('wally_result.tar.gz')
        tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)

        if os.path.exists(tmp_dir):
            shutil.rmtree(tmp_dir)

        os.mkdir(tmp_dir)
        loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
        file_full_names = " ".join(all_files)

        try:
            os.unlink(loc_arch_name)
        except:
            pass

        with node.connection.open_sftp() as sftp:
            try:
                exit_code = read_from_remote(sftp, self.exit_code_file)
            except IOError:
                logger.error("No exit code file found on %s. Looks like process failed to start",
                             conn_id)
                return None

            err_out = read_from_remote(sftp, self.err_out_file)
            exit_code = exit_code.strip()

            if exit_code != '0':
                msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
                logger.critical(msg.strip())
                raise StopTestError("fio failed")

            rossh("rm -f {0}".format(arch_name), nolog=True)
            pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
            rossh(pack_files_cmd, nolog=True)
            sftp.get(arch_name, loc_arch_name)

        unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
        subprocess.check_call(unpack_files_cmd, shell=True)
        os.unlink(loc_arch_name)

        for ftype, fls in files.items():
            for idx, fname in fls:
                cname = os.path.join(tmp_dir, fname)
                loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
                loc_path = os.path.join(self.config.log_directory, loc_fname)
                os.rename(cname, loc_path)

        cname = os.path.join(tmp_dir,
                             os.path.basename(self.results_file))
        loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
        loc_path = os.path.join(self.config.log_directory, loc_fname)
        os.rename(cname, loc_path)
        os.rmdir(tmp_dir)

        remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
                                                                      arch_name,
                                                                      file_full_names)
        rossh(remove_remote_res_files_cmd, nolog=True)
        return begin, end

    @classmethod
    def prepare_data(cls, results):
        """
        create a table with io performance report
        for console
        """

        def key_func(data):
            tpl = data.summary_tpl()
            return (data.name,
                    tpl.oper,
                    tpl.mode,
                    ssize2b(tpl.bsize),
                    int(tpl.th_count) * int(tpl.vm_count))
        res = []

        for item in sorted(results, key=key_func):
            test_dinfo = item.disk_perf_info()
            testnodes_count = len(item.config.nodes)

            iops, _ = test_dinfo.iops.rounded_average_conf()

            if test_dinfo.iops_sys is not None:
                iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
                _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
                iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
                iops_sys = round_3_digit(iops_sys)
            else:
                iops_sys = None
                iops_sys_per_vm = None
                iops_sys_dev = None
                iops_sys_conf = None

            bw, bw_conf = test_dinfo.bw.rounded_average_conf()
            _, bw_dev = test_dinfo.bw.rounded_average_dev()
            conf_perc = int(round(bw_conf * 100 / bw))
            dev_perc = int(round(bw_dev * 100 / bw))

            lat_50 = round_3_digit(int(test_dinfo.lat_50))
            lat_95 = round_3_digit(int(test_dinfo.lat_95))
            lat_avg = round_3_digit(int(test_dinfo.lat_avg))

            iops_per_vm = round_3_digit(iops / testnodes_count)
            bw_per_vm = round_3_digit(bw / testnodes_count)

            iops = round_3_digit(iops)
            bw = round_3_digit(bw)

            summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())

            res.append({"name": key_func(item)[0],
                        "key": key_func(item)[:4],
                        "summ": summ,
                        "iops": int(iops),
                        "bw": int(bw),
                        "conf": str(conf_perc),
                        "dev": str(dev_perc),
                        "iops_per_vm": int(iops_per_vm),
                        "bw_per_vm": int(bw_per_vm),
                        "lat_50": lat_50,
                        "lat_95": lat_95,
                        "lat_avg": lat_avg,

                        "iops_sys": iops_sys,
                        "iops_sys_per_vm": iops_sys_per_vm,
                        "sys_conf": iops_sys_conf,
                        "sys_dev": iops_sys_dev})

        return res

    Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
    fiels_and_header = [
        Field("Name",           "name",        "l",  7),
        Field("Description",    "summ",        "l", 19),
        Field("IOPS\ncum",      "iops",        "r",  3),
        # Field("IOPS_sys\ncum",  "iops_sys",    "r",  3),
        Field("KiBps\ncum",     "bw",          "r",  6),
        Field("Cnf %\n95%",     "conf",        "r",  3),
        Field("Dev%",           "dev",         "r",  3),
        Field("iops\n/vm",      "iops_per_vm", "r",  3),
        Field("KiBps\n/vm",     "bw_per_vm",   "r",  6),
        Field("lat ms\nmedian", "lat_50",      "r",  3),
        Field("lat ms\n95%",    "lat_95",      "r",  3),
        Field("lat\navg",       "lat_avg",     "r",  3),
    ]

    fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)

    @classmethod
    def format_for_console(cls, results):
        """
        create a table with io performance report
        for console
        """

        tab = texttable.Texttable(max_width=120)
        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
        tab.set_cols_align([f.allign for f in cls.fiels_and_header])
        sep = ["-" * f.size for f in cls.fiels_and_header]
        tab.header([f.header for f in cls.fiels_and_header])
        prev_k = None
        for item in cls.prepare_data(results):
            if prev_k is not None:
                if prev_k != item["key"]:
                    tab.add_row(sep)

            prev_k = item["key"]
            tab.add_row([item[f.attr] for f in cls.fiels_and_header])

        return tab.draw()

    @classmethod
    def format_diff_for_console(cls, list_of_results):
        """
        create a table with io performance report
        for console
        """

        tab = texttable.Texttable(max_width=200)
        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)

        header = [
            cls.fiels_and_header_dct["name"].header,
            cls.fiels_and_header_dct["summ"].header,
        ]
        allign = ["l", "l"]

        header.append("IOPS ~ Cnf% ~ Dev%")
        allign.extend(["r"] * len(list_of_results))
        header.extend(
            "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
        )

        header.append("BW")
        allign.extend(["r"] * len(list_of_results))
        header.extend(
            "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
        )

        header.append("LAT")
        allign.extend(["r"] * len(list_of_results))
        header.extend(
            "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
        )

        tab.header(header)
        sep = ["-" * 3] * len(header)
        processed_results = map(cls.prepare_data, list_of_results)

        key2results = []
        for res in processed_results:
            key2results.append(dict(
                ((item["name"], item["summ"]), item) for item in res
            ))

        prev_k = None
        iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
        for item in processed_results[0]:
            if prev_k is not None:
                if prev_k != item["key"]:
                    tab.add_row(sep)

            prev_k = item["key"]

            key = (item['name'], item['summ'])
            line = list(key)
            base = key2results[0][key]

            line.append(iops_frmt.format(base))

            for test_results in key2results[1:]:
                val = test_results.get(key)
                if val is None:
                    line.append("-")
                elif base['iops'] == 0:
                    line.append("Nan")
                else:
                    prc_val = {'dev': val['dev'], 'conf': val['conf']}
                    prc_val['iops'] = int(100 * val['iops'] / base['iops'])
                    line.append(iops_frmt.format(prc_val))

            line.append(base['bw'])

            for test_results in key2results[1:]:
                val = test_results.get(key)
                if val is None:
                    line.append("-")
                elif base['bw'] == 0:
                    line.append("Nan")
                else:
                    line.append(int(100 * val['bw'] / base['bw']))

            for test_results in key2results:
                val = test_results.get(key)
                if val is None:
                    line.append("-")
                else:
                    line.append("{0[lat_50]} - {0[lat_95]}".format(val))

            tab.add_row(line)

        tab.set_cols_align(allign)
        return tab.draw()
