blob: dd96a696b4f3907eb298887c42e6fdfc0c17c1be [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
5from datetime import datetime, timedelta
6from time import sleep
7
Alexdcb792f2021-10-04 14:24:21 -05008from cfg_checker.common import logger_cli
9# from cfg_checker.common.exception import InvalidReturnException
10# from cfg_checker.common.exception import ConfigException
11# from cfg_checker.common.exception import KubeException
12
13from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060014from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050015
16
17class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060018 _agent_template = "cfgagent-template.yaml"
19
20 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050021 self.env_config = config
22 return
23
Alexdcb792f2021-10-04 14:24:21 -050024
25class SaltCephBench(CephBench):
26 def __init__(
27 self,
28 config
29 ):
Alex5cace3b2021-11-10 16:40:37 -060030 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050031
32 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060033 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050034 return
35
36
37class KubeCephBench(CephBench):
38 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060039 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050040 self.master = KubeNodes(config)
41 super(KubeCephBench, self).__init__(config)
Alex5cace3b2021-11-10 16:40:37 -060042 self.storage_class = config.bench_storage_class
43 self.agent_pods = []
44 self.services = []
45 self.api_urls = []
46 self.mode = config.bench_mode
47 if config.bench_mode == "tasks":
48 self.load_tasks(config.bench_task_file)
49
Alex2a7657c2021-11-10 20:51:34 -060050 self.cleanup_list = []
51
Alex5cace3b2021-11-10 16:40:37 -060052 def load_tasks(self, taskfile):
53 # Load csv file
54 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
55 self.tasks = []
56 with open(taskfile) as f:
57 _reader = csv.reader(f, delimiter=',')
58 # load packages
59 for row in _reader:
60 self.tasks.append({
61 "readwrite": row[0],
62 "rwmixread": row[1],
63 "bs": row[2],
64 "iodepth": row[3],
65 "size": row[4]
66 })
67
Alex2a7657c2021-11-10 20:51:34 -060068 def add_for_deletion(self, obj, typ):
69 _d = [typ, obj.metadata.namespace, obj.metadata.name]
70 self.cleanup_list.append(_d)
71 return
72
Alex5cace3b2021-11-10 16:40:37 -060073 def prepare_agents(self, options):
74 logger_cli.info("# Preparing {} agents".format(self.agent_count))
75 for idx in range(self.agent_count):
76 # create pvc/pv and pod
77 logger_cli.info("-> creating agent '{:02}'".format(idx))
78 _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
79 idx,
80 os.path.split(options["filename"])[0],
81 self.storage_class,
82 options['size'] + 'i',
83 self._agent_template
84 )
Alex2a7657c2021-11-10 20:51:34 -060085 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -060086 self.agent_pods.append(_agent)
Alex2a7657c2021-11-10 20:51:34 -060087 self.add_for_deletion(_pv, "pv")
88 self.add_for_deletion(_pvc, "pvc")
89 self.add_for_deletion(_agent, "pod")
90
Alex5cace3b2021-11-10 16:40:37 -060091 # expose it
92 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -060093 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -060094 # Save service
95 self.services.append(_svc)
96
97 # Build urls for agents
98 for svc in self.services:
99 self.api_urls.append(
100 "http://{}:{}/api/".format(
101 svc.spec.cluster_ip,
102 8765
103 )
104 )
105 logger_cli.info("-> Done creating agents")
106 return
107
108 def _poke_agent(self, url, body, action="GET"):
109 _datafile = "/tmp/data"
110 _data = [
111 "--data-binary",
112 "@" + _datafile,
113 ]
114 _cmd = [
115 "curl",
116 "-s",
117 "-H",
118 "'Content-Type: application/json'",
119 "-X",
120 action,
121 url
122 ]
123 if body:
124 _cmd += _data
125 _ret = self.master.prepare_json_in_pod(
126 self.agent_pods[0].metadata.name,
127 self.master._namespace,
128 body,
129 _datafile
130 )
131 _ret = self.master.exec_cmd_on_target_pod(
132 self.agent_pods[0].metadata.name,
133 self.master._namespace,
134 " ".join(_cmd)
135 )
136 try:
137 return json.loads(_ret)
138 except TypeError as e:
139 logger_cli.error(
140 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
141 )
142 except json.decoder.JSONDecodeError as e:
143 logger_cli.error(
144 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
145 )
146
147 return None
148
149 def run_benchmark(self, options):
150 def get_status():
151 return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
152 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
153 logger_cli.info("# Checking agents")
154 # make sure agents idle
155 _tt = []
156 _rr = []
157 for _s in get_status():
158 if _s is None:
159 logger_cli.error("ERROR: Agent status not available")
160 return False
161 _h = _s["healthcheck"]["hostname"]
162 _t = _s['status']
163 _r = _s["healthcheck"]["ready"]
164 if _t != "idle":
165 logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
166 _tt += [False]
167 else:
168 _tt += [True]
169 if not _r:
170 logger_cli.error("Agent is not ready {}".format(_h))
171 _rr += [False]
172 else:
173 _rr += [True]
174 if not any(_tt) or not any(_rr):
175 return False
176
177 # Make sure that Ceph is at low load
178 # TODO: Ceph status check
179
180 # Do benchmark according to mode
181 if self.mode == "tasks":
182 # TODO: Implement 'tasks' mode
183 # take next task
184
185 # update options
186
187 # init time to schedule
188
189 # send next task to agent
190 pass
191 # wait for agents to finish
192 elif self.mode == "single":
193 logger_cli.info("# Running benchmark")
194 # init time to schedule
195 _time = datetime.now() + timedelta(seconds=10)
196 _str_time = _time.strftime(_datetime_fmt)
197 options["scheduled_to"] = _str_time
198 logger_cli.info(
199 "-> next benchmark scheduled to '{}'".format(_str_time)
200 )
201 # send single to agent
202 _task = {
203 "module": "fio",
204 "action": "do_singlerun",
205 "options": options
206 }
207 for _u in self.api_urls:
208 logger_cli.info("-> sending task to '{}'".format(_u))
209 _ret = self._poke_agent(_u, _task, action="POST")
210 if 'error' in _ret:
211 logger_cli.error(
212 "ERROR: Agent returned: '{}'".format(_ret['error'])
213 )
214
215 _runtime = _get_seconds(options["runtime"])
216 _ramptime = _get_seconds(options["ramp_time"])
217 _timeout = _runtime + _ramptime + 5
Alex2a7657c2021-11-10 20:51:34 -0600218 _end = datetime.now() + timedelta(seconds=_timeout)
219 while True:
220 # Print status
Alex5cace3b2021-11-10 16:40:37 -0600221 _sts = get_status()
222 _str = ""
223 for _s in _sts:
224 _str += "{}: {} ({}); ".format(
225 _s["healthcheck"]["hostname"],
226 _s["status"],
227 _s["progress"]
228 )
Alex2a7657c2021-11-10 20:51:34 -0600229 # recalc how much is left
230 diff = (_end - datetime.now()).total_seconds()
231 logger_cli.debug("... [{}s]: {}".format(diff, _str))
232 # In case end_datetime was in past to begin with
233 if diff < 0:
234 break
235 logger_cli.info("-> Sleeping for {}s".format(diff/2))
236 sleep(diff/2)
237 if diff <= 0.1:
238 break
Alex5cace3b2021-11-10 16:40:37 -0600239
Alex2a7657c2021-11-10 20:51:34 -0600240 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600241 return True
242
243 def cleanup(self):
Alex2a7657c2021-11-10 20:51:34 -0600244 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600245
Alex2a7657c2021-11-10 20:51:34 -0600246 for _res in self.cleanup_list:
247 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
248 logger_cli.info("# Done cleaning up")
Alex5cace3b2021-11-10 16:40:37 -0600249 return
250
251 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600252 def create_report(self, filename):
Alex5cace3b2021-11-10 16:40:37 -0600253
254 return