blob: 5fa4cfd6e72d4fc2d90f0bc73427a393e1268e29 [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
5from datetime import datetime, timedelta
6from time import sleep
7
Alexdcb792f2021-10-04 14:24:21 -05008from cfg_checker.common import logger_cli
Alexbfa947c2021-11-11 18:14:28 -06009from cfg_checker.helpers.console_utils import Progress
Alexdcb792f2021-10-04 14:24:21 -050010# from cfg_checker.common.exception import InvalidReturnException
11# from cfg_checker.common.exception import ConfigException
12# from cfg_checker.common.exception import KubeException
13
14from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060015from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050016
17
18class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060019 _agent_template = "cfgagent-template.yaml"
20
21 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050022 self.env_config = config
23 return
24
Alexdcb792f2021-10-04 14:24:21 -050025
26class SaltCephBench(CephBench):
27 def __init__(
28 self,
29 config
30 ):
Alex5cace3b2021-11-10 16:40:37 -060031 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050032
33 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060034 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050035 return
36
37
38class KubeCephBench(CephBench):
39 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060040 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050041 self.master = KubeNodes(config)
42 super(KubeCephBench, self).__init__(config)
Alex5cace3b2021-11-10 16:40:37 -060043 self.storage_class = config.bench_storage_class
44 self.agent_pods = []
45 self.services = []
46 self.api_urls = []
47 self.mode = config.bench_mode
48 if config.bench_mode == "tasks":
49 self.load_tasks(config.bench_task_file)
50
Alex2a7657c2021-11-10 20:51:34 -060051 self.cleanup_list = []
52
Alex5cace3b2021-11-10 16:40:37 -060053 def load_tasks(self, taskfile):
54 # Load csv file
55 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
56 self.tasks = []
57 with open(taskfile) as f:
58 _reader = csv.reader(f, delimiter=',')
59 # load packages
60 for row in _reader:
61 self.tasks.append({
62 "readwrite": row[0],
63 "rwmixread": row[1],
64 "bs": row[2],
65 "iodepth": row[3],
66 "size": row[4]
67 })
68
Alex2a7657c2021-11-10 20:51:34 -060069 def add_for_deletion(self, obj, typ):
70 _d = [typ, obj.metadata.namespace, obj.metadata.name]
71 self.cleanup_list.append(_d)
72 return
73
Alex5cace3b2021-11-10 16:40:37 -060074 def prepare_agents(self, options):
75 logger_cli.info("# Preparing {} agents".format(self.agent_count))
76 for idx in range(self.agent_count):
77 # create pvc/pv and pod
78 logger_cli.info("-> creating agent '{:02}'".format(idx))
79 _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
80 idx,
81 os.path.split(options["filename"])[0],
82 self.storage_class,
83 options['size'] + 'i',
84 self._agent_template
85 )
Alex2a7657c2021-11-10 20:51:34 -060086 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -060087 self.agent_pods.append(_agent)
Alex2a7657c2021-11-10 20:51:34 -060088 self.add_for_deletion(_pv, "pv")
89 self.add_for_deletion(_pvc, "pvc")
90 self.add_for_deletion(_agent, "pod")
91
Alex5cace3b2021-11-10 16:40:37 -060092 # expose it
93 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -060094 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -060095 # Save service
96 self.services.append(_svc)
97
98 # Build urls for agents
99 for svc in self.services:
100 self.api_urls.append(
101 "http://{}:{}/api/".format(
102 svc.spec.cluster_ip,
103 8765
104 )
105 )
106 logger_cli.info("-> Done creating agents")
107 return
108
109 def _poke_agent(self, url, body, action="GET"):
110 _datafile = "/tmp/data"
111 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600112 "-d",
113 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600114 ]
115 _cmd = [
116 "curl",
117 "-s",
118 "-H",
119 "'Content-Type: application/json'",
120 "-X",
121 action,
122 url
123 ]
124 if body:
125 _cmd += _data
126 _ret = self.master.prepare_json_in_pod(
127 self.agent_pods[0].metadata.name,
128 self.master._namespace,
129 body,
130 _datafile
131 )
132 _ret = self.master.exec_cmd_on_target_pod(
133 self.agent_pods[0].metadata.name,
134 self.master._namespace,
135 " ".join(_cmd)
136 )
137 try:
138 return json.loads(_ret)
139 except TypeError as e:
140 logger_cli.error(
141 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
142 )
143 except json.decoder.JSONDecodeError as e:
144 logger_cli.error(
145 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
146 )
147
148 return None
149
150 def run_benchmark(self, options):
151 def get_status():
152 return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
153 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
154 logger_cli.info("# Checking agents")
155 # make sure agents idle
156 _tt = []
157 _rr = []
158 for _s in get_status():
159 if _s is None:
160 logger_cli.error("ERROR: Agent status not available")
161 return False
162 _h = _s["healthcheck"]["hostname"]
163 _t = _s['status']
164 _r = _s["healthcheck"]["ready"]
Alexbfa947c2021-11-11 18:14:28 -0600165 if _t not in ["idle", "finished"]:
Alex5cace3b2021-11-10 16:40:37 -0600166 logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
167 _tt += [False]
168 else:
169 _tt += [True]
170 if not _r:
171 logger_cli.error("Agent is not ready {}".format(_h))
172 _rr += [False]
173 else:
174 _rr += [True]
175 if not any(_tt) or not any(_rr):
176 return False
177
178 # Make sure that Ceph is at low load
179 # TODO: Ceph status check
180
181 # Do benchmark according to mode
182 if self.mode == "tasks":
183 # TODO: Implement 'tasks' mode
184 # take next task
185
186 # update options
187
188 # init time to schedule
189
190 # send next task to agent
191 pass
192 # wait for agents to finish
193 elif self.mode == "single":
194 logger_cli.info("# Running benchmark")
195 # init time to schedule
196 _time = datetime.now() + timedelta(seconds=10)
197 _str_time = _time.strftime(_datetime_fmt)
198 options["scheduled_to"] = _str_time
199 logger_cli.info(
200 "-> next benchmark scheduled to '{}'".format(_str_time)
201 )
202 # send single to agent
203 _task = {
204 "module": "fio",
205 "action": "do_singlerun",
206 "options": options
207 }
208 for _u in self.api_urls:
209 logger_cli.info("-> sending task to '{}'".format(_u))
210 _ret = self._poke_agent(_u, _task, action="POST")
211 if 'error' in _ret:
212 logger_cli.error(
213 "ERROR: Agent returned: '{}'".format(_ret['error'])
214 )
215
216 _runtime = _get_seconds(options["runtime"])
217 _ramptime = _get_seconds(options["ramp_time"])
218 _timeout = _runtime + _ramptime + 5
Alex2a7657c2021-11-10 20:51:34 -0600219 _end = datetime.now() + timedelta(seconds=_timeout)
220 while True:
221 # Print status
Alex5cace3b2021-11-10 16:40:37 -0600222 _sts = get_status()
223 _str = ""
224 for _s in _sts:
225 _str += "{}: {} ({}); ".format(
226 _s["healthcheck"]["hostname"],
227 _s["status"],
228 _s["progress"]
229 )
Alex2a7657c2021-11-10 20:51:34 -0600230 # recalc how much is left
231 diff = (_end - datetime.now()).total_seconds()
Alexbfa947c2021-11-11 18:14:28 -0600232 logger_cli.debug("... [{:.2f}s]: {}".format(diff, _str))
Alex2a7657c2021-11-10 20:51:34 -0600233 # In case end_datetime was in past to begin with
234 if diff < 0:
235 break
Alexbfa947c2021-11-11 18:14:28 -0600236 logger_cli.info("-> Sleeping for {:.2f}s".format(diff/2))
Alex2a7657c2021-11-10 20:51:34 -0600237 sleep(diff/2)
238 if diff <= 0.1:
239 break
Alex5cace3b2021-11-10 16:40:37 -0600240
Alex2a7657c2021-11-10 20:51:34 -0600241 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600242 return True
243
244 def cleanup(self):
Alex2a7657c2021-11-10 20:51:34 -0600245 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600246
Alex2a7657c2021-11-10 20:51:34 -0600247 for _res in self.cleanup_list:
248 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600249
250 # Wait for resource to be cleaned
251 _timeout = 120
252 _total = len(self.cleanup_list)
253 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
254 _p = Progress(_total)
255 while True:
256 _g = self.master.get_resource_phase_by_name
257 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
258 _l = [item for item in _l if item]
259 _idx = _total - len(_l)
260 if len(_l) > 0:
261 _p.write_progress(_idx)
262 else:
263 _p.end()
264 logger_cli.info("# Done cleaning up")
265 break
266 if _timeout > 0:
267 _timeout -= 1
268 else:
269 _p.end()
270 logger_cli.info("# Timed out waiting after 120s.")
271 break
272
Alex5cace3b2021-11-10 16:40:37 -0600273 return
274
275 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600276 def create_report(self, filename):
Alex5cace3b2021-11-10 16:40:37 -0600277
278 return