#    Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
#    Copyright 2019-2022 Mirantis, Inc.
import json
import os
import queue
import signal
import subprocess


from copy import deepcopy
from datetime import datetime, timezone
from platform import system, release, node
from threading import Thread
import threading
from time import sleep


from cfg_checker.common.exception import CheckerException
from cfg_checker.common.other import piped_shell
from cfg_checker.common.log import logger


_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
fio_options_common = {
    "name": "agent_run",
    "filename": "/cephvol/testfile",
    "status-interval": "500ms",
    "randrepeat": 0,
    "verify": 0,
    "direct": 1,
    "gtod_reduce": 0,
    "bs": "32k",
    "iodepth": 16,
    "size": "10G",
    "readwrite": "randrw",
    "ramp_time": "5s",
    "runtime": "30s",
    "ioengine": "libaio"
}

fio_options_seq = {
    "numjobs": 1,
    "offset_increment": "500M"
}
fio_options_mix = {
    "rwmixread": 50
}

seq_modes = ['read', 'write']
mix_modes = ['randrw']
rand_modes = ['randread', 'randwrite']


def get_fio_options():
    # Duplicate function for external option access
    _opts = deepcopy(fio_options_common)
    _opts.update(deepcopy(fio_options_seq))
    _opts.update(deepcopy(fio_options_mix))
    return _opts


def output_reader(_stdout, outq):
    for line in iter(_stdout.readline, ''):
        outq.put(line)


def _o(option, param, suffix=""):
    return "--{}={}{}".format(option, param, suffix)


def get_time(timestamp=None):
    if not timestamp:
        _t = datetime.now(timezone.utc)
    else:
        _t = datetime.fromtimestamp(timestamp)
    return _t.strftime(_datetime_fmt)


def _get_seconds(value):
    # assume that we have symbol at the end
    _suffix = value[-1]
    if _suffix == 's':
        return int(value[:-1])
    elif _suffix == 'm':
        return int(value[:-1])*60
    elif _suffix == 'h':
        return int(value[:-1])*60*60
    else:
        return -1


def wait_until(end_datetime):
    while True:
        diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
        # In case end_datetime was in past to begin with
        if diff < 0:
            return
        sleep(diff/2)
        if diff <= 0.1:
            return


class ShellThread(object):
    def __init__(self, cmd, queue):
        self.cmd = cmd
        self.queue = queue
        self._p = None
        self.timeout = 15
        self.output = []

    def run_shell(self):
        # Start
        _cmd = " ".join(self.cmd)
        logger.debug("... {}".format(_cmd))
        self._p = subprocess.Popen(
            _cmd,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            env={"PYTHONUNBUFFERED": "1"},
            universal_newlines=True,
            bufsize=1
        )
        self._t = threading.Thread(
            target=output_reader,
            args=(self._p.stdout, self.queue)
        )
        self._t.start()
        if not self.wait_started():
            self.kill_shell()

    def is_alive(self):
        if not self._p.poll():
            return True
        else:
            return False

    def wait_started(self):
        while True:
            if not self.queue.empty():
                break
            else:
                logger.debug("... {} sec".format(self.timeout))
                sleep(1)
                self.timeout -= 1
                if not self.timeout:
                    logger.debug(
                        "...timed out after {} sec".format(str(self.timeout))
                    )
                    return False
        logger.debug("... got first fio output")
        return True

    def kill_shell(self):
        # Run the poll
        if not self._p.poll():
            self._p.send_signal(signal.SIGINT)
        self.get_output()

    def get_output(self):
        while True:
            try:
                line = self.queue.get(block=False)
                line = str(line) if isinstance(line, bytes) else line
                self.output.append(line)
            except queue.Empty:
                return self.output
        return None


class FioProcess(Thread):
    # init vars for status
    _fio_options_list = [
        "--time_based",
        "--output-format=json+",
        "--eta=always"
    ]
    _fio_options_seq_list = [
        "--thread"
    ]

    _fio_options_common = fio_options_common
    _fio_options_seq = fio_options_seq
    _fio_options_mix = fio_options_mix

    eta_sec = 0
    total_time_sec = 0
    elapsed_sec = 0
    testrun = {}

    mount_point = "/cephvol"
    filename = "testfile"

    # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
    mode = "randrw"
    _seq_modes = seq_modes
    _mix_modes = mix_modes
    _rand_modes = rand_modes

    # results
    results = {}

    def _shell(self, cmd):
        self._code, self._shell_output = piped_shell(cmd, code=True)
        if self._code:
            logger.error(
                "# Shell error for '{}': [{}] {}".format(
                    cmd,
                    self._code,
                    self._shell_output
                )
            )
            return False
        else:
            return True

    def recalc_times(self):
        _rt = _get_seconds(self._fio_options_common["runtime"])
        _rup = _get_seconds(self._fio_options_common["ramp_time"])
        if not _rt:
            raise CheckerException("invalid 'runtime': '{}'".format(_rt))
        elif not _rup:
            raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))

        self.total_time_sec = _rt + _rup
        self.eta_sec = self.total_time_sec

    def __init__(self):
        Thread.__init__(self)
        logger.info("fio thread initialized")
        # save system
        self.system = system()
        self.release = release()
        self.hostname = node()
        # create a clear var for last shell output
        self._shell_output = ""
        # prepare params
        self.recalc_times()
        # prepare the fio
        self.fio_version = "unknown"
        if not self._shell("fio --version"):
            raise CheckerException(
                "Error running fio: '{}'".format(self._shell_output)
            )
        else:
            self.fio_version = self._shell_output
        # all outputs timeline
        self.timeline = {}
        # setup target file
        if not os.path.exists(self.mount_point):
            logger.warning(
                "WARNING: '{}' not exists, using tmp folder".format(
                    self.mount_point
                )
            )
            self.mount_point = "/tmp"
        self._fio_options_common["filename"] = os.path.join(
            self.mount_point,
            self.filename
        )

        if self.system == "Darwin":
            self._fio_options_common["ioengine"] = "posixaio"
        # Thread finish marker
        self.finished = False
        self.testrun_starttime = None
        self.scheduled_datetime = None

    def update_options(self, _dict):
        # validate keys, do not validate numbers themselves
        for k, v in _dict.items():
            if k in self._fio_options_mix:
                self._fio_options_mix[k] = v
            elif k in self._fio_options_seq:
                self._fio_options_seq[k] = v
            elif k in self._fio_options_common:
                self._fio_options_common[k] = v
            else:
                raise CheckerException(
                    "Unknown option: '{}': '{}'".format(k, v)
                )
        # recalc
        self.recalc_times()

    def run(self):
        def _cut(_list, _s, _e):
            _new = _list[_s:_e]
            _pre = _list[:_s]
            _list = _pre + _list[_e:]
            return (_new, _list)

        # create a cmd
        _cmd = ["fio"]
        _cmd += self._fio_options_list
        _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]

        if self._fio_options_common["readwrite"] in self._seq_modes:
            _sq = self._fio_options_seq_list
            _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
        elif self._fio_options_common["readwrite"] in self._mix_modes:
            _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]

        _q = queue.Queue()
        self.fiorun = ShellThread(_cmd, _q)
        # Check if schedule is set
        _now = datetime.now(timezone.utc)
        if self.scheduled_datetime:
            logger.debug(
                "waiting for '{}', now is '{}', total of {} sec left".format(
                    self.scheduled_datetime.strftime(_datetime_fmt),
                    _now.strftime(_datetime_fmt),
                    (self.scheduled_datetime - _now).total_seconds()
                )
            )
            wait_until(self.scheduled_datetime)
        else:
            self.testrun_starttime = _now.strftime(_datetime_fmt)
        self.fiorun.run_shell()
        _raw = []
        _start = -1
        _end = -1
        while self.fiorun.is_alive() or not _q.empty():
            while not _q.empty():
                # processing
                _bb = _q.get(block=False)
                if isinstance(_bb, bytes):
                    _line = _bb.decode('utf-8')
                else:
                    _line = _bb
                if _start < 0 and _end < 0 and not _line.startswith("{"):
                    self.results[self.testrun_starttime] = {
                        "error": _line
                    }
                    self.eta = -1
                    self.fiorun.kill_shell()
                    self.finished = True
                    return
                _current = _line.splitlines()
                _raw += _current
                for ll in range(len(_raw)):
                    if _start < 0 and _raw[ll] == "{":
                        _start = ll
                    elif _end < 0 and _raw[ll] == "}":
                        _end = ll
                # loop until we have full json
                if _end < 0 or _start < 0:
                    continue
                # if start and and found, cut json
                (_json, _raw) = _cut(_raw, _start, _end+1)
                _start = -1
                _end = -1
                # Try to parse json
                _json = "\n".join(_json)
                try:
                    _json = json.loads(_json)
                    _timestamp = _json["timestamp"]
                    self.timeline[_timestamp] = _json["jobs"][0]

                    # save last values
                    self.eta_sec = self.timeline[_timestamp]["eta"]
                    self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
                    self.testrun = _json
                except TypeError as e:
                    logger.error("ERROR: {}".format(e))
                except json.decoder.JSONDecodeError as e:
                    logger.error("ERROR: {}".format(e))
            if not self.eta_sec:
                break
            sleep(0.1)
        # Save status to results dictionary
        self.results[self.testrun_starttime] = {
            "result": self.testrun,
            "timeline": self.timeline
        }
        self.finished = True
        self.scheduled_datetime = None
        self.testrun_starttime = None
        return

    def healthcheck(self):
        _version = self.fio_version
        _binary_path = self._shell_output if self._shell("which fio") else ""
        if self._shell("fio --enghelp"):
            _ioengines = self._shell_output
            _ioengines = _ioengines.replace("\t", "")
            _ioengines = _ioengines.splitlines()[1:]
            self._shell_output = ""
        else:
            _ioengines = []

        return {
            "ready": all((_version, _binary_path, _ioengines)),
            "version": _version,
            "path": _binary_path,
            "ioengines": _ioengines,
            "system": self.system,
            "release": self.release,
            "hostname": self.hostname
        }

    def status(self):
        _running = self.is_alive() and self.eta_sec >= 0
        _scheduled = False
        _diff = -1
        if self.scheduled_datetime:
            _now = datetime.now(timezone.utc)
            _diff = (self.scheduled_datetime - _now).total_seconds()
            if _diff > 0:
                _scheduled = True
        _s = "running" if _running else "idle"
        _s = "scheduled" if _scheduled else _s
        _s = "finished" if self.finished else _s
        return {
            "status": _s,
            "progress": self.get_progress()
        }

    def end_fio(self):
        if self.fiorun:
            self.fiorun.kill_shell()

    # Current run
    def percent_done(self):
        _total = self.elapsed_sec + self.eta_sec
        return float(self.elapsed_sec) / float(_total) * 100.0

    def get_progress(self):
        return "{:.2f}".format(self.percent_done())

    # latest parsed measurements
    def get_last_measurements(self):
        if self.timeline:
            return self.timeline[max(list(self.timeline.keys()))]
        else:
            return {}


class FioProcessShellRun(object):
    stats = {}
    results = {}

    def __init__(self, init_class=FioProcess):
        self.init_class = init_class
        self.actions = {
            "do_singlerun": self.do_singlerun,
            "do_scheduledrun": self.do_scheduledrun,
            "get_options": self.get_options,
            "get_result": self.get_result,
            "get_resultlist": self.get_resultlist
        }
        self.fio_reset()

    @staticmethod
    def healthcheck(fio):
        hchk = fio.healthcheck()
        hchk_str = \
            "# fio status: {}\n# {} at {}\n# Engines: {}".format(
                "ready" if hchk["ready"] else "fail",
                hchk["version"],
                hchk["path"],
                ", ".join(hchk["ioengines"])
            )
        return hchk, hchk_str

    def status(self):
        return self.fio.status()

    def fio_reset(self):
        # Fancy way of handling fio class not even initialized yet
        try:
            _f = self.fio.finished
            _r = self.fio.results
            _o = self.fio.get_options()
        except AttributeError:
            _f = True
            _r = None
            _o = None
        # Check if reset is needed
        if not _f:
            # No need to reset, fio is either idle or running
            return
        else:
            # extract results if they present
            if _r:
                self.results.update(_r)
            # re-init
            _fio = self.init_class()
            # Do healthcheck
            self.hchk, self.hchk_str = self.healthcheck(_fio)
            # restore options if they existed
            if _o:
                _fio.update_options(_o)
            self.fio = _fio

    def get_options(self):
        _opts = deepcopy(self.fio._fio_options_common)
        _opts.update(deepcopy(self.fio._fio_options_seq))
        _opts.update(deepcopy(self.fio._fio_options_mix))
        return _opts

    def do_singlerun(self, options):
        # Reset thread if it closed
        self.fio_reset()
        # Fill options
        if "scheduled_to" in options:
            # just ignore it
            _k = "scheduled_to"
            _v = options.pop(_k)
            logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
        self.fio.update_options(options)
        # Start it
        self.fio.start()
        return True

    def do_scheduledrun(self, options):
        # Reset thread if it closed
        self.fio_reset()
        # Handle scheduled time
        if "scheduled_to" not in options:
            # required parameter not set
            raise CheckerException("Parameter missing: 'scheduled_to'")
        else:
            # set time and get rid of it from options
            self.fio.testrun_starttime = options.pop("scheduled_to")
            self.fio.scheduled_datetime = datetime.strptime(
                self.fio.testrun_starttime,
                _datetime_fmt
            )
        # Fill options
        self.fio.update_options(options)
        # Start it
        self.fio.start()
        return True

    def _get_result_object(self, obj_name, time):
        if time in self.results:
            if obj_name in self.results[time]:
                return self.results[time][obj_name]
            elif "error" in self.results[time]:
                return self.results[time]["error"]
            else:
                return {
                    "error": "Empty {} for '{}'".format(obj_name, time)
                }
        else:
            return {
                "error": "Result not found for '{}'".format(time)
            }

    def _update_results(self):
        # Update only in case of completed thread
        if self.fio.finished:
            _r_local = list(self.results.keys())
            _r_fio = list(self.fio.results.keys())
            for _r in _r_fio:
                if _r not in _r_local:
                    self.results[_r] = self.fio.results.pop(_r)

    def get_result(self, time):
        self._update_results()
        return self._get_result_object('result', time)

    def get_result_timeline(self, time):
        self._update_results()
        return self._get_result_object('timeline', time)

    # reporting
    def get_resultlist(self):
        self._update_results()
        return list(self.results.keys())

    def __call__(self):
        if not self.fio.is_alive() and not self.fio.finished:
            self.fio.start()

        while self.fio.is_alive() and self.fio.eta_sec >= 0:
            sleep(0.2)
            self.stats = self.fio.get_last_measurements()

            _r = self.stats.get('read', {})
            _w = self.stats.get('write', {})

            _r_bw = _r.get('bw_bytes', -1)
            _r_iops = _r.get('iops', -1)
            _w_bw = _w.get('bw_bytes', -1)
            _w_iops = _w.get('iops', -1)
            _s = self.fio.status()
            if _s["status"] == "scheduled":
                _t = self.fio.scheduled_datetime
                _n = datetime.now(timezone.utc)
                _delta = (_t - _n).total_seconds()
                print(
                    "{}: waiting for '{}'; now '{}'; {} sec left".format(
                        _s["status"],
                        _t.strftime(_datetime_fmt),
                        _n.strftime(_datetime_fmt),
                        _delta
                    )
                )
            else:
                stats = "{}: {:>7}% ({}/{}) " \
                        "(BW/IOPS: " \
                        "Read {:>9.2f} MB/{:>9.2f} " \
                        "Write {:>9.2f} MB/{:>9.2f})".format(
                            _s["status"],
                            _s["progress"],
                            self.fio.elapsed_sec,
                            self.fio.eta_sec,
                            _r_bw / 1024 / 1024,
                            _r_iops,
                            _w_bw / 1024 / 1024,
                            _w_iops
                        )
                print(stats)
        self.fio.end_fio()


if __name__ == '__main__':
    # Debug shell to test FioProcessShellRun
    _shell = FioProcessShellRun()
    _opts = _shell.get_options()
    _opts["readwrite"] = "read"
    _opts["ramp_time"] = "1s"
    _opts["runtime"] = "5s"
    _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000"
    _shell.do_scheduledrun(_opts)
    _shell()
    _times = _shell.get_resultlist()
    print("# results:\n{}".format("\n".join(_times)))
    # print(
    #     "##### Dumping results\n{}".format(
    #         json.dumps(_shell.get_result(_times[0]), indent=2)
    #     )
    # )
    _shell.fio_reset()
    _opts = _shell.get_options()
    _opts["readwrite"] = "read"
    _opts["ramp_time"] = "1s"
    _opts["runtime"] = "10s"
    # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000"
    _shell.do_singlerun(_opts)
    _shell()
    _times = _shell.get_resultlist()
    print("# results:\n{}".format("\n".join(_times)))
