cfg-checker bench part 1
- first single test debug portion
- updated fio option extraction
- updated date interaction
- fixed agent error showing and handling
Related-PROD: PROD-36669
Change-Id: I7c1014c01b5b84429f112bff8db5ad34944c4644
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index f1cecab..69ec661 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -19,6 +19,38 @@
_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
+fio_options_common = {
+ "name": "agent_run",
+ "filename": "/cephvol/testfile",
+ "status-interval": "500ms",
+ "randrepeat": 0,
+ "verify": 0,
+ "direct": 1,
+ "gtod_reduce": 0,
+ "bs": "32k",
+ "iodepth": 16,
+ "size": "10G",
+ "readwrite": "randrw",
+ "ramp_time": "5s",
+ "runtime": "30s",
+ "ioengine": "libaio"
+}
+
+fio_options_seq = {
+ "numjobs": 1,
+ "offset_increment": "500M"
+}
+fio_options_mix = {
+ "rwmixread": 50
+}
+
+
+def get_fio_options():
+ # Duplicate function for external option access
+ _opts = deepcopy(fio_options_common)
+ _opts.update(deepcopy(fio_options_seq))
+ _opts.update(deepcopy(fio_options_mix))
+ return _opts
def output_reader(_stdout, outq):
@@ -141,37 +173,16 @@
"--thread"
]
- _fio_options_common = {
- "name": "agent_run",
- "filename": "/tmp/testfile",
- "status-interval": "500ms",
- "randrepeat": 0,
- "verify": 0,
- "direct": 1,
- "gtod_reduce": 0,
- "bs": "32k",
- "iodepth": 16,
- "size": "10G",
- "readwrite": "randrw",
- "ramp_time": "5s",
- "runtime": "30s",
- "ioengine": "libaio"
- }
-
- _fio_options_seq = {
- "numjobs": 1,
- "offset_increment": "500M"
- }
- _fio_options_mix = {
- "rwmixread": 50
- }
+ _fio_options_common = fio_options_common
+ _fio_options_seq = fio_options_seq
+ _fio_options_mix = fio_options_mix
eta_sec = 0
total_time_sec = 0
elapsed_sec = 0
testrun = {}
- mount_point = "/tmp"
+ mount_point = "/cephvol"
filename = "testfile"
# test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
@@ -229,7 +240,14 @@
self.fio_version = self._shell_output
# all outputs timeline
self.timeline = {}
-
+ # setup target file
+ if not os.path.exists(self.mount_point):
+ logger.warning(
+ "WARNING: '{}' not exists, using tmp folder".format(
+ self.mount_point
+ )
+ )
+ self.mount_point = "/tmp"
self._fio_options_common["filename"] = os.path.join(
self.mount_point,
self.filename
@@ -484,7 +502,11 @@
return False
else:
# set time and get rid of it from options
- self.fio.scheduled_datetime = options.pop("scheduled_to")
+ _time = options.pop("scheduled_to")
+ self.fio.scheduled_datetime = datetime.strptime(
+ _time,
+ _datetime_fmt
+ )
# Fill options
self.fio.update_options(options)
# Start it
@@ -594,7 +616,9 @@
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "10s"
- _opts["scheduled_to"] = datetime.now() + timedelta(seconds=12)
+ _opts["scheduled_to"] = (datetime.now() + timedelta(seconds=12)).strftime(
+ _datetime_fmt
+ )
_shell.do_scheduledrun(_opts)
_shell()
_times = _shell.get_resultlist()
diff --git a/cfg_checker/agent/webserver.py b/cfg_checker/agent/webserver.py
index 8b8466c..67a4567 100644
--- a/cfg_checker/agent/webserver.py
+++ b/cfg_checker/agent/webserver.py
@@ -165,7 +165,7 @@
)
return
else:
- _resp(falcon.HTTP_400, "Empty request body")
+ _resp(resp, falcon.HTTP_400, "Empty request body")
class Index:
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 95bb19c..0ce573a 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -6,13 +6,15 @@
import urllib3
import yaml
-from kubernetes import client as kclient, config as kconfig
+from kubernetes import client as kclient, config as kconfig, watch
from kubernetes.stream import stream
from kubernetes.client.rest import ApiException
+from time import time, sleep
from cfg_checker.common import logger, logger_cli
from cfg_checker.common.decorators import retry
-from cfg_checker.common.exception import InvalidReturnException, KubeException
+from cfg_checker.common.exception import CheckerException, \
+ InvalidReturnException, KubeException
from cfg_checker.common.file_utils import create_temp_file_with_content
from cfg_checker.common.other import utils, shell
from cfg_checker.common.ssh_utils import ssh_shell_p
@@ -262,6 +264,27 @@
return None
+ def wait_for(self, _func, phase, *args, **kwargs):
+ w = watch.Watch()
+ start_time = time()
+ for event in w.stream(_func, *args, **kwargs):
+ if event["object"].status.phase == phase:
+ w.stop()
+ end_time = time()
+ logger_cli.debug(
+ "... bacame '{}' in {:0.2f} sec".format(
+ phase,
+ end_time-start_time
+ )
+ )
+ return
+ # event.type: ADDED, MODIFIED, DELETED
+ if event["type"] == "DELETED":
+ # Pod was deleted while we were waiting for it to start.
+ logger_cli.debug("... deleted before started")
+ w.stop()
+ return
+
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
@@ -644,3 +667,250 @@
version=version,
plural=plural
)
+
+ def init_pvc_resource(
+ self,
+ name,
+ storage_class,
+ size,
+ ns="qa-space",
+ mode="ReadWriteOnce"
+ ):
+ """Return the Kubernetes PVC resource"""
+ return kclient.V1PersistentVolumeClaim(
+ api_version='v1',
+ kind='PersistentVolumeClaim',
+ metadata=kclient.V1ObjectMeta(
+ name=name,
+ namespace=ns,
+ labels={"name": name}
+ ),
+ spec=kclient.V1PersistentVolumeClaimSpec(
+ storage_class_name=storage_class,
+ access_modes=[mode],
+ resources=kclient.V1ResourceRequirements(
+ requests={'storage': size}
+ )
+ )
+ )
+
+ def init_pv_resource(
+ self,
+ name,
+ storage_class,
+ size,
+ path,
+ ns="qa-space",
+ mode="ReadWriteOnce"
+ ):
+ """Return the Kubernetes PVC resource"""
+ return kclient.V1PersistentVolume(
+ api_version='v1',
+ kind='PersistentVolume',
+ metadata=kclient.V1ObjectMeta(
+ name=name,
+ namespace=ns,
+ labels={"name": name}
+ ),
+ spec=kclient.V1PersistentVolumeSpec(
+ storage_class_name=storage_class,
+ access_modes=[mode],
+ capacity={'storage': size},
+ host_path=kclient.V1HostPathVolumeSource(path=path)
+ )
+ )
+
+ def init_service(
+ self,
+ name,
+ port,
+ clusterip=None,
+ ns="qa-space"
+ ):
+ """ Inits a V1Service object with data for benchmark agent"""
+ _meta = kclient.V1ObjectMeta(
+ name=name,
+ namespace=ns,
+ labels={"name": name}
+ )
+ _port = kclient.V1ServicePort(
+ port=port,
+ protocol="TCP",
+ target_port=port
+ )
+ _spec = kclient.V1ServiceSpec(
+ # cluster_ip=clusterip,
+ selector={"name": name},
+ # type="ClusterIP",
+ ports=[_port]
+ )
+ return kclient.V1Service(
+ api_version="v1",
+ kind="Service",
+ metadata=_meta,
+ spec=_spec
+ )
+
+ def prepare_pv(self, pv_object):
+ def _list_pv():
+ return self.CoreV1.list_persistent_volume(
+ label_selector='name={}'.format(pv_object.metadata.name)
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_pv(),
+ pv_object.metadata.name
+ )
+ if _existing is not None:
+ self.CoreV1.replace_persistent_volume(
+ pv_object.metadata.name,
+ pv_object
+ )
+ else:
+ self.CoreV1.create_persistent_volume(pv_object)
+
+ _timeout = 60
+ while _timeout > 0:
+ _t = _list_pv()
+ for item in _t.items:
+ if item.status.phase in ["Available", "Bound"]:
+ return item
+ sleep(1)
+ _timeout -= 1
+ raise CheckerException(
+ "Timed out creating PV '{}'".format(
+ pv_object.metadata.name
+ )
+ )
+
+ def prepare_pvc(self, pvc_object):
+ def _list_pvc():
+ return self.CoreV1.list_namespaced_persistent_volume_claim(
+ pvc_object.metadata.namespace,
+ label_selector='name={}'.format(pvc_object.metadata.name)
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_pvc(),
+ pvc_object.metadata.name
+ )
+ if _existing is not None:
+ _size_r = pvc_object.spec.resources.requests["storage"]
+ _size_e = _existing.spec.resources.requests["storage"]
+ logger_cli.warn(
+ "WARNING: Found PVC '{}/{}' with {}. Requested: {}'".format(
+ pvc_object.metadata.namespace,
+ pvc_object.metadata.name,
+ _size_e,
+ _size_r
+ )
+ )
+ if _size_r != _size_e:
+ raise CheckerException(
+ "ERROR: PVC exists on the cloud with different size "
+ "than needed. Please cleanup!"
+ )
+ else:
+ logger_cli.debug(
+ "... creating pvc '{}'".format(pvc_object.metadata.name)
+ )
+ self.CoreV1.create_namespaced_persistent_volume_claim(
+ pvc_object.metadata.namespace,
+ pvc_object
+ )
+
+ _timeout = 60
+ while _timeout > 0:
+ _t = _list_pvc()
+ for item in _t.items:
+ if item.status.phase in ["Available", "Bound"]:
+ return item
+ else:
+ logger_cli.debug(
+ "... pvcstatus: '{}', {} sec left".format(
+ item.status.phase,
+ _timeout
+ )
+ )
+ sleep(5)
+ _timeout -= 5
+ raise CheckerException(
+ "Timed out creating PVC '{}'".format(
+ pvc_object.metadata.name
+ )
+ )
+
+ def prepare_pod_from_yaml(self, pod_yaml):
+ def _list_pod():
+ return self.CoreV1.list_namespaced_pod(
+ pod_yaml['metadata']['namespace'],
+ label_selector='name={}'.format(pod_yaml['metadata']['name'])
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_pod(),
+ pod_yaml['metadata']['name']
+ )
+ if _existing is not None:
+ logger_cli.warn(
+ "WARNING: Found pod '{}/{}'. Reusing.".format(
+ pod_yaml['metadata']['namespace'],
+ pod_yaml['metadata']['name']
+ )
+ )
+ return _existing
+ else:
+ self.CoreV1.create_namespaced_pod(
+ pod_yaml['metadata']['namespace'],
+ pod_yaml
+ )
+ _timeout = 120
+ logger_cli.debug("... waiting '{}'s for pod to start".format(_timeout))
+ while _timeout > 0:
+ _t = _list_pod()
+ for item in _t.items:
+ logger_cli.debug("... pod is '{}'".format(item.status.phase))
+ if item.status.phase in ["Running"]:
+ return item
+ sleep(2)
+ _timeout -= 2
+ raise CheckerException(
+ "Timed out creating pod '{}'".format(
+ pod_yaml['metadata']['name']
+ )
+ )
+
+ def expose_pod_port(self, pod_object, port, ns="qa-space"):
+ def _list_svc():
+ return self.CoreV1.list_namespaced_service(
+ pod_object.metadata.namespace,
+ label_selector='name={}'.format(pod_object.metadata.name)
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_svc(),
+ pod_object.metadata.name
+ )
+ if _existing is not None:
+ # TODO: Check port number?
+ logger_cli.warn(
+ "WARNING: Pod already exposed '{}/{}:{}'. Reusing.".format(
+ pod_object.metadata.namespace,
+ pod_object.metadata.name,
+ port
+ )
+ )
+ return _existing
+ else:
+ logger_cli.debug(
+ "... creating service for pod {}/{}: {}:{}".format(
+ pod_object.metadata.namespace,
+ pod_object.metadata.name,
+ pod_object.status.pod_ip,
+ port
+ )
+ )
+ _svc = self.init_service(
+ pod_object.metadata.name,
+ port
+ )
+ return self.CoreV1.create_namespaced_service(
+ pod_object.metadata.namespace,
+ _svc
+ )
diff --git a/cfg_checker/helpers/args_utils.py b/cfg_checker/helpers/args_utils.py
index 68f22d5..84bd6f1 100644
--- a/cfg_checker/helpers/args_utils.py
+++ b/cfg_checker/helpers/args_utils.py
@@ -26,11 +26,13 @@
return _skip, _skip_file
-def get_arg(args, str_arg):
+def get_arg(args, str_arg, nofail=False):
_attr = getattr(args, str_arg)
if _attr:
return _attr
else:
+ if nofail:
+ return None
_c = args.command if hasattr(args, 'command') else ''
_t = args.type if hasattr(args, 'type') else ''
raise ConfigException(
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 495e9b1..9d0f923 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,3 +1,4 @@
+from cfg_checker.agent.fio_runner import get_fio_options
from cfg_checker.common import logger_cli
from cfg_checker.common.settings import ENV_TYPE_KUBE
from cfg_checker.helpers import args_utils
@@ -71,6 +72,26 @@
metavar='ceph_tasks_filename',
help="List file with data for Ceph bench testrun"
)
+ ceph_bench_parser.add_argument(
+ '--agents',
+ type=int, metavar='agent_count', default=5,
+ help="List file with data for Ceph bench testrun"
+ )
+ ceph_bench_parser.add_argument(
+ '--html',
+ metavar='ceph_html_filename',
+ help="HTML filename to save report"
+ )
+ ceph_bench_parser.add_argument(
+ '--storage-class',
+ metavar='storage_class',
+ help="Storage class to be used in benchmark"
+ )
+ ceph_bench_parser.add_argument(
+ '--task-file',
+ metavar='task-file',
+ help="Task file for benchmark"
+ )
return _parser
@@ -123,21 +144,45 @@
def do_bench(args, config):
# Ceph Benchmark using multiple pods
- # Prepare the tasks and do synced testrun
- # TODO: html option to create a fancy report
+ # Prepare the tasks and do synced testrun or a single one
+ logger_cli.info("# Initializing benchmark run")
args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
+ _filename = args_utils.get_arg(args, 'html')
+ # agents count option
+ _agents = args_utils.get_arg(args, "agents")
+ logger_cli.info("-> using {} agents".format(_agents))
+ config.bench_agent_count = _agents
+ # storage class
+ _storage_class = args_utils.get_arg(args, "storage_class")
+ logger_cli.info("-> using storage class of '{}'".format(_storage_class))
+ config.bench_storage_class = _storage_class
+ # Task files or options
+ _task_file = args_utils.get_arg(args, "task_file", nofail=True)
+ if not _task_file:
+ logger_cli.info("-> running single run")
+ config.bench_mode = "single"
+ else:
+ logger_cli.info("-> running with tasks from '{}'".format(_task_file))
+ config.bench_task_file = _task_file
+ config.bench_mode = "tasks"
+ _opts = get_fio_options()
+ logger_cli.debug("... default/selected options for fio:")
+ for _k in _opts.keys():
+ # TODO: Update options for single run
+ logger_cli.debug(" {} = {}".format(_k, _opts[_k]))
+ # handle option inavailability
ceph_bench = bench.KubeCephBench(config)
- logger_cli.error("ERROR: To be implemented...")
-
# Load tasks
# Do the testrun
- ceph_bench.prepare_pods()
- ceph_bench.run_benchmark()
+ ceph_bench.prepare_agents(_opts)
+ if not ceph_bench.run_benchmark(_opts):
+ return
+ ceph_bench.cleanup()
# Create report
- ceph_bench.create_report()
+ ceph_bench.create_report(_filename)
return
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 28c7929..190076a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -1,48 +1,228 @@
+import csv
+import os
+import json
+
+from datetime import datetime, timedelta
+from time import sleep
+
from cfg_checker.common import logger_cli
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
# from cfg_checker.common.exception import KubeException
from cfg_checker.nodes import KubeNodes
+from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
class CephBench(object):
- def __info__(
- self,
- config
- ):
+ _agent_template = "cfgagent-template.yaml"
+
+ def __init__(self, config):
self.env_config = config
return
- def prepare_pods(self):
-
- return
-
- def run_benchmark(self):
-
- return
-
- # Create report
- def create_report(self):
-
- return
-
class SaltCephBench(CephBench):
def __init__(
self,
config
):
- logger_cli.error("ERROR: Not impelented for Sale environment!")
+ logger_cli.error("ERROR: Not impelented for Salt environment!")
# self.master = SaltNodes(config)
- super(SaltCephBench, self).__init__(
- config
- )
+ super(SaltCephBench, self).__init__(config)
return
class KubeCephBench(CephBench):
def __init__(self, config):
+ self.agent_count = config.bench_agent_count
self.master = KubeNodes(config)
super(KubeCephBench, self).__init__(config)
+ self.storage_class = config.bench_storage_class
+ self.agent_pods = []
+ self.services = []
+ self.api_urls = []
+ self.mode = config.bench_mode
+ if config.bench_mode == "tasks":
+ self.load_tasks(config.bench_task_file)
+
+ def load_tasks(self, taskfile):
+ # Load csv file
+ logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
+ self.tasks = []
+ with open(taskfile) as f:
+ _reader = csv.reader(f, delimiter=',')
+ # load packages
+ for row in _reader:
+ self.tasks.append({
+ "readwrite": row[0],
+ "rwmixread": row[1],
+ "bs": row[2],
+ "iodepth": row[3],
+ "size": row[4]
+ })
+
+ def prepare_agents(self, options):
+ logger_cli.info("# Preparing {} agents".format(self.agent_count))
+ for idx in range(self.agent_count):
+ # create pvc/pv and pod
+ logger_cli.info("-> creating agent '{:02}'".format(idx))
+ _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+ idx,
+ os.path.split(options["filename"])[0],
+ self.storage_class,
+ options['size'] + 'i',
+ self._agent_template
+ )
+ # save it to list
+ self.agent_pods.append(_agent)
+ # expose it
+ _svc = self.master.expose_benchmark_agent(_agent)
+ # Save service
+ self.services.append(_svc)
+
+ # Build urls for agents
+ for svc in self.services:
+ self.api_urls.append(
+ "http://{}:{}/api/".format(
+ svc.spec.cluster_ip,
+ 8765
+ )
+ )
+ logger_cli.info("-> Done creating agents")
+ return
+
+ def _poke_agent(self, url, body, action="GET"):
+ _datafile = "/tmp/data"
+ _data = [
+ "--data-binary",
+ "@" + _datafile,
+ ]
+ _cmd = [
+ "curl",
+ "-s",
+ "-H",
+ "'Content-Type: application/json'",
+ "-X",
+ action,
+ url
+ ]
+ if body:
+ _cmd += _data
+ _ret = self.master.prepare_json_in_pod(
+ self.agent_pods[0].metadata.name,
+ self.master._namespace,
+ body,
+ _datafile
+ )
+ _ret = self.master.exec_cmd_on_target_pod(
+ self.agent_pods[0].metadata.name,
+ self.master._namespace,
+ " ".join(_cmd)
+ )
+ try:
+ return json.loads(_ret)
+ except TypeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+ )
+ except json.decoder.JSONDecodeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+ )
+
+ return None
+
+ def run_benchmark(self, options):
+ def get_status():
+ return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
+ logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+ logger_cli.info("# Checking agents")
+ # make sure agents idle
+ _tt = []
+ _rr = []
+ for _s in get_status():
+ if _s is None:
+ logger_cli.error("ERROR: Agent status not available")
+ return False
+ _h = _s["healthcheck"]["hostname"]
+ _t = _s['status']
+ _r = _s["healthcheck"]["ready"]
+ if _t != "idle":
+ logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
+ _tt += [False]
+ else:
+ _tt += [True]
+ if not _r:
+ logger_cli.error("Agent is not ready {}".format(_h))
+ _rr += [False]
+ else:
+ _rr += [True]
+ if not any(_tt) or not any(_rr):
+ return False
+
+ # Make sure that Ceph is at low load
+ # TODO: Ceph status check
+
+ # Do benchmark according to mode
+ if self.mode == "tasks":
+ # TODO: Implement 'tasks' mode
+ # take next task
+
+ # update options
+
+ # init time to schedule
+
+ # send next task to agent
+ pass
+ # wait for agents to finish
+ elif self.mode == "single":
+ logger_cli.info("# Running benchmark")
+ # init time to schedule
+ _time = datetime.now() + timedelta(seconds=10)
+ _str_time = _time.strftime(_datetime_fmt)
+ options["scheduled_to"] = _str_time
+ logger_cli.info(
+ "-> next benchmark scheduled to '{}'".format(_str_time)
+ )
+ # send single to agent
+ _task = {
+ "module": "fio",
+ "action": "do_singlerun",
+ "options": options
+ }
+ for _u in self.api_urls:
+ logger_cli.info("-> sending task to '{}'".format(_u))
+ _ret = self._poke_agent(_u, _task, action="POST")
+ if 'error' in _ret:
+ logger_cli.error(
+ "ERROR: Agent returned: '{}'".format(_ret['error'])
+ )
+
+ _runtime = _get_seconds(options["runtime"])
+ _ramptime = _get_seconds(options["ramp_time"])
+ _timeout = _runtime + _ramptime + 5
+ while _timeout > 0:
+ _sts = get_status()
+ _str = ""
+ for _s in _sts:
+ _str += "{}: {} ({}); ".format(
+ _s["healthcheck"]["hostname"],
+ _s["status"],
+ _s["progress"]
+ )
+ logger_cli.debug("... {}".format(_str))
+ sleep(1)
+ _timeout -= 1
+
+ return True
+
+ def cleanup(self):
+
+ return
+
+ # Create report
+ def create_report(self):
+
+ return
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index 1dcec2a..9570978 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1267,3 +1267,44 @@
data[target_key] = None
else:
data[target_key] = _result[node]
+
+ def prepare_benchmark_agent(self, index, path, sc, size, template):
+ # Load pod template
+ _yaml_file = os.path.join(pkg_dir, 'templates', template)
+ logger_cli.debug("... loading template '{}'".format(_yaml_file))
+ _pod = {}
+ with open(_yaml_file) as podFile:
+ _pod = yaml.load(podFile, Loader=yaml.SafeLoader)
+
+ # set namings
+ _n = "cfgagent-{:02}".format(index)
+ _pvc_n = "cfgagent-pvc-{:02}".format(index)
+ _pv_n = "cfgagent-pv-{:02}".format(index)
+
+ _pod["metadata"]["name"] = _n
+ _pod["metadata"]["labels"]["name"] = _n
+ # replace volumeMounts
+ for _c in _pod["spec"]["containers"]:
+ for _mnt in _c["volumeMounts"]:
+ if "placeholder" in _mnt["name"]:
+ _mnt["name"] = _pv_n
+ _mnt["mountPath"] = path
+ # replace claim
+ for _v in _pod["spec"]["volumes"]:
+ if "placeholder" in _v["name"]:
+ _v["name"] = _pv_n
+ _v["persistentVolumeClaim"]["claimName"] = _pvc_n
+
+ # init volume resources
+ _pv_object = self.kube.init_pv_resource(_pv_n, sc, size, path)
+ _pv = self.kube.prepare_pv(_pv_object)
+ _pvc_object = self.kube.init_pvc_resource(_pvc_n, sc, size)
+ _pvc = self.kube.prepare_pvc(_pvc_object)
+
+ # start pod
+ _pod = self.kube.prepare_pod_from_yaml(_pod)
+
+ return _pod, _pv, _pvc
+
+ def expose_benchmark_agent(self, agent):
+ return self.kube.expose_pod_port(agent, 8765)