blob: 8e8a4e955c533f4104926fd60c7d7e43cacdd4e7 [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02004from concurrent.futures import Future
5from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03006
koder aka kdanilov39e449e2016-12-17 15:15:26 +02007from . import utils, ssh_utils, hw_info
8from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +02009from .node import setup_rpc, connect
koder aka kdanilov7f59d562016-12-26 01:34:23 +020010from .node_interfaces import NodeInfo, IRPCNode
koder aka kdanilov39e449e2016-12-17 15:15:26 +020011from .stage import Stage, StepOrder
koder aka kdanilov7f59d562016-12-26 01:34:23 +020012from .sensors import collect_sensors_data
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030013from .suits.io.fio import IOPerfTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020014from .suits.itest import TestInputConfig
15from .suits.mysql import MysqlTest
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030016from .suits.omgbench import OmgTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020017from .suits.postgres import PgBenchTest
18from .test_run_class import TestRun
koder aka kdanilov7f59d562016-12-26 01:34:23 +020019from .statistic import calc_stat_props
20from .utils import StopTestError
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030021
22
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030023TOOL_TYPE_MAPPER = {
24 "io": IOPerfTest,
25 "pgbench": PgBenchTest,
26 "mysql": MysqlTest,
Yulia Portnovab0c977c2015-12-11 19:23:28 +020027 "omg": OmgTest,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030028}
koder aka kdanilov63ad2062015-04-27 13:11:40 +030029
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030030
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030031logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030032
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080033
koder aka kdanilov39e449e2016-12-17 15:15:26 +020034class ConnectStage(Stage):
35 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080036
koder aka kdanilov39e449e2016-12-17 15:15:26 +020037 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030038
koder aka kdanilov39e449e2016-12-17 15:15:26 +020039 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020040 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020041 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030042
koder aka kdanilov39e449e2016-12-17 15:15:26 +020043 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
44 try:
45 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020046
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020047 return True, setup_rpc(ssh_node,
48 ctx.rpc_code,
49 ctx.default_rpc_plugins,
50 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020051 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020052 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020053 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030054
koder aka kdanilov39e449e2016-12-17 15:15:26 +020055 failed_testnodes = [] # type: List[NodeInfo]
56 failed_nodes = [] # type: List[NodeInfo]
57 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030058
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020059 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020060 if not ok:
61 node = cast(NodeInfo, node)
62 if 'testnode' in node.roles:
63 failed_testnodes.append(node)
64 else:
65 failed_nodes.append(node)
66 else:
67 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020068
koder aka kdanilov39e449e2016-12-17 15:15:26 +020069 if failed_nodes:
70 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020071 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030072
koder aka kdanilov39e449e2016-12-17 15:15:26 +020073 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020074 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030075 logger.error(msg)
76 raise utils.StopTestError(msg)
77
koder aka kdanilov39e449e2016-12-17 15:15:26 +020078 if not failed_nodes:
79 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030080
koder aka kdanilov39e449e2016-12-17 15:15:26 +020081 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020082 if ctx.config.get("download_rpc_logs", False):
83 for node in ctx.nodes:
84 if node.rpc_log_file is not None:
85 nid = node.info.node_id()
86 path = "rpc_logs/" + nid
87 node.conn.server.flush_logs()
88 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020089 if path in ctx.storage:
90 previous = ctx.storage.get_raw(path)
91 else:
92 previous = b""
93 ctx.storage.put_raw(previous + log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020094 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
95
96 with ctx.get_pool() as pool:
97 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +030098
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030099
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200100class CollectInfoStage(Stage):
101 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200102
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200103 priority = StepOrder.START_SENSORS - 2
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200104 config_block = 'collect_info'
105
106 def run(self, ctx: TestRun) -> None:
107 if not ctx.config.collect_info:
108 return
109
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200110 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200111
112 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200113 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200114 for node in ctx.nodes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200115 nid = node.info.node_id()
116 hw_info_path = "hw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200117 if hw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200118 futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200119
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200120 for (path, nid), future in futures.items():
121 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200122 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200123 except Exception:
124 logger.exception("During collecting hardware info from %s", nid)
125 raise utils.StopTestError()
126
127 futures.clear()
128 for node in ctx.nodes:
129 nid = node.info.node_id()
130 sw_info_path = "sw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200131 if sw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200132 futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200133
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200134 for (path, nid), future in futures.items():
135 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200136 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200137 except Exception:
138 logger.exception("During collecting software info from %s", nid)
139 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200140
141
142class ExplicitNodesStage(Stage):
143 """add explicit nodes"""
144
145 priority = StepOrder.DISCOVER
146 config_block = 'nodes'
147
148 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200149 if 'all_nodes' in ctx.storage:
150 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
151 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200152
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200153 for url, roles in ctx.config.get('nodes', {}).raw().items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200154 ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200155 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200156
157
158class SaveNodesStage(Stage):
159 """Save nodes list to file"""
160
161 priority = StepOrder.CONNECT
162
163 def run(self, ctx: TestRun) -> None:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200164 ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200165
166
167class SleepStage(Stage):
168 """Save nodes list to file"""
169
170 priority = StepOrder.TEST
171 config_block = 'sleep'
172
173 def run(self, ctx: TestRun) -> None:
174 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
175 time.sleep(ctx.config.sleep)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200176
177
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200178class PrepareNodes(Stage):
179 priority = StepOrder.START_SENSORS - 1
180
181 def __init__(self):
182 Stage.__init__(self)
183 self.nodeepscrub_updated = False
184 self.noscrub_updated = False
185
186 def run(self, ctx: TestRun) -> None:
187 ceph_sett = ctx.config.get('ceph_settings', "").split()
188 if ceph_sett:
189 for node in ctx.nodes:
190 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
191 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
192 if 'noscrub' in ceph_sett:
193 if 'noscrub' in state:
194 logger.debug("noscrub already set on cluster")
195 else:
196 logger.info("Applying noscrub settings to ceph cluster")
197 node.run("ceph osd set noscrub")
198 self.noscrub_updated = True
199
200 if 'nodeepscrub' in ceph_sett:
201 if 'nodeepscrub' in state:
202 logger.debug("noscrub already set on cluster")
203 else:
204 logger.info("Applying noscrub settings to ceph cluster")
205 node.run("ceph osd set noscrub")
206 self.nodeepscrub_updated = True
207 break
208
209 def cleanup(self, ctx: TestRun) -> None:
210 if self.nodeepscrub_updated or self.noscrub_updated:
211 for node in ctx.nodes:
212 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
213 if self.noscrub_updated:
214 logger.info("Reverting noscrub setting for ceph cluster")
215 node.run("ceph osd unset noscrub")
216 self.noscrub_updated = False
217
218 if self.nodeepscrub_updated:
219 logger.info("Reverting noscrub setting for ceph cluster")
220 node.run("ceph osd unset nodeepscrub")
221 self.nodeepscrub_updated = False
222
223
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200224class RunTestsStage(Stage):
225
226 priority = StepOrder.TEST
227 config_block = 'tests'
228
229 def run(self, ctx: TestRun) -> None:
230 for test_group in ctx.config.get('tests', []):
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200231 if not ctx.config.no_tests:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200232 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200233
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200234 if not test_nodes:
235 logger.error("No test nodes found")
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200236 raise StopTestError()
koder aka kdanilovda45e882015-04-06 02:24:42 +0300237
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200238 for name, params in test_group.items():
239 vm_count = params.get('node_limit', None) # type: Optional[int]
gstepanov023c1e42015-04-08 15:50:19 +0300240
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200241 # select test nodes
242 if vm_count is None:
243 curr_test_nodes = test_nodes
244 else:
245 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200246
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200247 if not curr_test_nodes:
248 logger.error("No nodes found for test, skipping it.")
249 continue
koder aka kdanilov70227062016-11-26 23:23:21 +0200250
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200251 test_cls = TOOL_TYPE_MAPPER[name]
252 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
253 test_cfg = TestInputConfig(test_cls.__name__,
254 params=params,
255 run_uuid=ctx.config.run_uuid,
256 nodes=test_nodes,
257 storage=ctx.storage,
258 remote_dir=remote_dir)
koder aka kdanilov70227062016-11-26 23:23:21 +0200259
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200260 test_cls(test_cfg,
261 on_idle=lambda: collect_sensors_data(ctx, False)).run()
gstepanov023c1e42015-04-08 15:50:19 +0300262
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200263 @classmethod
264 def validate_config(cls, cfg: ConfigBlock) -> None:
265 pass
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200266
267
268class CalcStatisticStage(Stage):
269 priority = StepOrder.TEST + 1
270
271 def run(self, ctx: TestRun) -> None:
272 results = {}
273 for name, summary, stor_path in ctx.storage.get("all_results"):
274 if name == 'fio':
275 test_info = ctx.storage.get(stor_path, "info")
276 for node in test_info['nodes']:
277 iops = ctx.storage.get_array(stor_path, node, 'iops_data')
278 bw = ctx.storage.get_array(stor_path, node, 'bw_data')
279 lat = ctx.storage.get_array(stor_path, node, 'lat_data')
280 results[summary] = (iops, bw, lat)
281
282 for name, (iops, bw, lat) in results.items():
283 print(" ------------------- IOPS -------------------")
284 print(calc_stat_props(iops)) # type: ignore
285 print(" ------------------- BW -------------------")
286 print(calc_stat_props(bw)) # type: ignore
287 # print(" ------------------- LAT -------------------")
288 # print(calc_stat_props(lat))
289
290
291class LoadStoredNodesStage(Stage):
292 priority = StepOrder.DISCOVER
293
294 def run(self, ctx: TestRun) -> None:
295 if 'all_nodes' in ctx.storage:
296 if ctx.nodes_info:
297 logger.error("Internal error: Some nodes already stored in " +
298 "nodes_info before LoadStoredNodesStage stage")
299 raise StopTestError()
300 ctx.nodes_info = {node.node_id(): node
301 for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
302 logger.info("%s nodes loaded from database", len(ctx.nodes_info))