blob: 190076a064aba1468f6878019c7105d72530875b [file] [log] [blame]
import csv
import os
import json
from datetime import datetime, timedelta
from time import sleep
from cfg_checker.common import logger_cli
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
# from cfg_checker.common.exception import KubeException
from cfg_checker.nodes import KubeNodes
from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
def __init__(self, config):
self.env_config = config
return
class SaltCephBench(CephBench):
def __init__(
self,
config
):
logger_cli.error("ERROR: Not impelented for Salt environment!")
# self.master = SaltNodes(config)
super(SaltCephBench, self).__init__(config)
return
class KubeCephBench(CephBench):
def __init__(self, config):
self.agent_count = config.bench_agent_count
self.master = KubeNodes(config)
super(KubeCephBench, self).__init__(config)
self.storage_class = config.bench_storage_class
self.agent_pods = []
self.services = []
self.api_urls = []
self.mode = config.bench_mode
if config.bench_mode == "tasks":
self.load_tasks(config.bench_task_file)
def load_tasks(self, taskfile):
# Load csv file
logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
self.tasks = []
with open(taskfile) as f:
_reader = csv.reader(f, delimiter=',')
# load packages
for row in _reader:
self.tasks.append({
"readwrite": row[0],
"rwmixread": row[1],
"bs": row[2],
"iodepth": row[3],
"size": row[4]
})
def prepare_agents(self, options):
logger_cli.info("# Preparing {} agents".format(self.agent_count))
for idx in range(self.agent_count):
# create pvc/pv and pod
logger_cli.info("-> creating agent '{:02}'".format(idx))
_agent, _pv, _pvc = self.master.prepare_benchmark_agent(
idx,
os.path.split(options["filename"])[0],
self.storage_class,
options['size'] + 'i',
self._agent_template
)
# save it to list
self.agent_pods.append(_agent)
# expose it
_svc = self.master.expose_benchmark_agent(_agent)
# Save service
self.services.append(_svc)
# Build urls for agents
for svc in self.services:
self.api_urls.append(
"http://{}:{}/api/".format(
svc.spec.cluster_ip,
8765
)
)
logger_cli.info("-> Done creating agents")
return
def _poke_agent(self, url, body, action="GET"):
_datafile = "/tmp/data"
_data = [
"--data-binary",
"@" + _datafile,
]
_cmd = [
"curl",
"-s",
"-H",
"'Content-Type: application/json'",
"-X",
action,
url
]
if body:
_cmd += _data
_ret = self.master.prepare_json_in_pod(
self.agent_pods[0].metadata.name,
self.master._namespace,
body,
_datafile
)
_ret = self.master.exec_cmd_on_target_pod(
self.agent_pods[0].metadata.name,
self.master._namespace,
" ".join(_cmd)
)
try:
return json.loads(_ret)
except TypeError as e:
logger_cli.error(
"ERROR: Status not decoded: {}\n{}".format(e, _ret)
)
except json.decoder.JSONDecodeError as e:
logger_cli.error(
"ERROR: Status not decoded: {}\n{}".format(e, _ret)
)
return None
def run_benchmark(self, options):
def get_status():
return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
logger_cli.info("# Starting '{}' benchmark".format(self.mode))
logger_cli.info("# Checking agents")
# make sure agents idle
_tt = []
_rr = []
for _s in get_status():
if _s is None:
logger_cli.error("ERROR: Agent status not available")
return False
_h = _s["healthcheck"]["hostname"]
_t = _s['status']
_r = _s["healthcheck"]["ready"]
if _t != "idle":
logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
_tt += [False]
else:
_tt += [True]
if not _r:
logger_cli.error("Agent is not ready {}".format(_h))
_rr += [False]
else:
_rr += [True]
if not any(_tt) or not any(_rr):
return False
# Make sure that Ceph is at low load
# TODO: Ceph status check
# Do benchmark according to mode
if self.mode == "tasks":
# TODO: Implement 'tasks' mode
# take next task
# update options
# init time to schedule
# send next task to agent
pass
# wait for agents to finish
elif self.mode == "single":
logger_cli.info("# Running benchmark")
# init time to schedule
_time = datetime.now() + timedelta(seconds=10)
_str_time = _time.strftime(_datetime_fmt)
options["scheduled_to"] = _str_time
logger_cli.info(
"-> next benchmark scheduled to '{}'".format(_str_time)
)
# send single to agent
_task = {
"module": "fio",
"action": "do_singlerun",
"options": options
}
for _u in self.api_urls:
logger_cli.info("-> sending task to '{}'".format(_u))
_ret = self._poke_agent(_u, _task, action="POST")
if 'error' in _ret:
logger_cli.error(
"ERROR: Agent returned: '{}'".format(_ret['error'])
)
_runtime = _get_seconds(options["runtime"])
_ramptime = _get_seconds(options["ramp_time"])
_timeout = _runtime + _ramptime + 5
while _timeout > 0:
_sts = get_status()
_str = ""
for _s in _sts:
_str += "{}: {} ({}); ".format(
_s["healthcheck"]["hostname"],
_s["status"],
_s["progress"]
)
logger_cli.debug("... {}".format(_str))
sleep(1)
_timeout -= 1
return True
def cleanup(self):
return
# Create report
def create_report(self):
return