blob: 190076a064aba1468f6878019c7105d72530875b [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
5from datetime import datetime, timedelta
6from time import sleep
7
Alexdcb792f2021-10-04 14:24:21 -05008from cfg_checker.common import logger_cli
9# from cfg_checker.common.exception import InvalidReturnException
10# from cfg_checker.common.exception import ConfigException
11# from cfg_checker.common.exception import KubeException
12
13from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060014from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050015
16
17class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060018 _agent_template = "cfgagent-template.yaml"
19
20 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050021 self.env_config = config
22 return
23
Alexdcb792f2021-10-04 14:24:21 -050024
25class SaltCephBench(CephBench):
26 def __init__(
27 self,
28 config
29 ):
Alex5cace3b2021-11-10 16:40:37 -060030 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050031
32 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060033 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050034 return
35
36
37class KubeCephBench(CephBench):
38 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060039 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050040 self.master = KubeNodes(config)
41 super(KubeCephBench, self).__init__(config)
Alex5cace3b2021-11-10 16:40:37 -060042 self.storage_class = config.bench_storage_class
43 self.agent_pods = []
44 self.services = []
45 self.api_urls = []
46 self.mode = config.bench_mode
47 if config.bench_mode == "tasks":
48 self.load_tasks(config.bench_task_file)
49
50 def load_tasks(self, taskfile):
51 # Load csv file
52 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
53 self.tasks = []
54 with open(taskfile) as f:
55 _reader = csv.reader(f, delimiter=',')
56 # load packages
57 for row in _reader:
58 self.tasks.append({
59 "readwrite": row[0],
60 "rwmixread": row[1],
61 "bs": row[2],
62 "iodepth": row[3],
63 "size": row[4]
64 })
65
66 def prepare_agents(self, options):
67 logger_cli.info("# Preparing {} agents".format(self.agent_count))
68 for idx in range(self.agent_count):
69 # create pvc/pv and pod
70 logger_cli.info("-> creating agent '{:02}'".format(idx))
71 _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
72 idx,
73 os.path.split(options["filename"])[0],
74 self.storage_class,
75 options['size'] + 'i',
76 self._agent_template
77 )
78 # save it to list
79 self.agent_pods.append(_agent)
80 # expose it
81 _svc = self.master.expose_benchmark_agent(_agent)
82 # Save service
83 self.services.append(_svc)
84
85 # Build urls for agents
86 for svc in self.services:
87 self.api_urls.append(
88 "http://{}:{}/api/".format(
89 svc.spec.cluster_ip,
90 8765
91 )
92 )
93 logger_cli.info("-> Done creating agents")
94 return
95
96 def _poke_agent(self, url, body, action="GET"):
97 _datafile = "/tmp/data"
98 _data = [
99 "--data-binary",
100 "@" + _datafile,
101 ]
102 _cmd = [
103 "curl",
104 "-s",
105 "-H",
106 "'Content-Type: application/json'",
107 "-X",
108 action,
109 url
110 ]
111 if body:
112 _cmd += _data
113 _ret = self.master.prepare_json_in_pod(
114 self.agent_pods[0].metadata.name,
115 self.master._namespace,
116 body,
117 _datafile
118 )
119 _ret = self.master.exec_cmd_on_target_pod(
120 self.agent_pods[0].metadata.name,
121 self.master._namespace,
122 " ".join(_cmd)
123 )
124 try:
125 return json.loads(_ret)
126 except TypeError as e:
127 logger_cli.error(
128 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
129 )
130 except json.decoder.JSONDecodeError as e:
131 logger_cli.error(
132 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
133 )
134
135 return None
136
137 def run_benchmark(self, options):
138 def get_status():
139 return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
140 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
141 logger_cli.info("# Checking agents")
142 # make sure agents idle
143 _tt = []
144 _rr = []
145 for _s in get_status():
146 if _s is None:
147 logger_cli.error("ERROR: Agent status not available")
148 return False
149 _h = _s["healthcheck"]["hostname"]
150 _t = _s['status']
151 _r = _s["healthcheck"]["ready"]
152 if _t != "idle":
153 logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
154 _tt += [False]
155 else:
156 _tt += [True]
157 if not _r:
158 logger_cli.error("Agent is not ready {}".format(_h))
159 _rr += [False]
160 else:
161 _rr += [True]
162 if not any(_tt) or not any(_rr):
163 return False
164
165 # Make sure that Ceph is at low load
166 # TODO: Ceph status check
167
168 # Do benchmark according to mode
169 if self.mode == "tasks":
170 # TODO: Implement 'tasks' mode
171 # take next task
172
173 # update options
174
175 # init time to schedule
176
177 # send next task to agent
178 pass
179 # wait for agents to finish
180 elif self.mode == "single":
181 logger_cli.info("# Running benchmark")
182 # init time to schedule
183 _time = datetime.now() + timedelta(seconds=10)
184 _str_time = _time.strftime(_datetime_fmt)
185 options["scheduled_to"] = _str_time
186 logger_cli.info(
187 "-> next benchmark scheduled to '{}'".format(_str_time)
188 )
189 # send single to agent
190 _task = {
191 "module": "fio",
192 "action": "do_singlerun",
193 "options": options
194 }
195 for _u in self.api_urls:
196 logger_cli.info("-> sending task to '{}'".format(_u))
197 _ret = self._poke_agent(_u, _task, action="POST")
198 if 'error' in _ret:
199 logger_cli.error(
200 "ERROR: Agent returned: '{}'".format(_ret['error'])
201 )
202
203 _runtime = _get_seconds(options["runtime"])
204 _ramptime = _get_seconds(options["ramp_time"])
205 _timeout = _runtime + _ramptime + 5
206 while _timeout > 0:
207 _sts = get_status()
208 _str = ""
209 for _s in _sts:
210 _str += "{}: {} ({}); ".format(
211 _s["healthcheck"]["hostname"],
212 _s["status"],
213 _s["progress"]
214 )
215 logger_cli.debug("... {}".format(_str))
216 sleep(1)
217 _timeout -= 1
218
219 return True
220
221 def cleanup(self):
222
223 return
224
225 # Create report
226 def create_report(self):
227
228 return