blob: 555cfa1c842ad5fea380e358e06c0d6166139df6 [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
kdanylov aka koder736e5c12017-05-07 17:27:14 +03003import copy
koder aka kdanilove21d7472015-02-14 19:02:04 -08004import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02005from concurrent.futures import Future
6from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03007
koder aka kdanilov39e449e2016-12-17 15:15:26 +02008from . import utils, ssh_utils, hw_info
9from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +020010from .node import setup_rpc, connect
koder aka kdanilov7f59d562016-12-26 01:34:23 +020011from .node_interfaces import NodeInfo, IRPCNode
koder aka kdanilov39e449e2016-12-17 15:15:26 +020012from .stage import Stage, StepOrder
koder aka kdanilov7f59d562016-12-26 01:34:23 +020013from .sensors import collect_sensors_data
koder aka kdanilov108ac362017-01-19 20:17:16 +020014from .suits.all_suits import all_suits
koder aka kdanilov39e449e2016-12-17 15:15:26 +020015from .test_run_class import TestRun
koder aka kdanilov7f59d562016-12-26 01:34:23 +020016from .utils import StopTestError
koder aka kdanilova732a602017-02-01 20:29:56 +020017from .result_classes import SuiteConfig
koder aka kdanilov108ac362017-01-19 20:17:16 +020018from .hlstorage import ResultStorage
koder aka kdanilov63ad2062015-04-27 13:11:40 +030019
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030020
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030021logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030022
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080023
koder aka kdanilov39e449e2016-12-17 15:15:26 +020024class ConnectStage(Stage):
25 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080026
koder aka kdanilov39e449e2016-12-17 15:15:26 +020027 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030028
koder aka kdanilov39e449e2016-12-17 15:15:26 +020029 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020030 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020031 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030032
koder aka kdanilov39e449e2016-12-17 15:15:26 +020033 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
34 try:
35 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020036
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020037 return True, setup_rpc(ssh_node,
38 ctx.rpc_code,
39 ctx.default_rpc_plugins,
40 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020041 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020042 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020043 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030044
koder aka kdanilov39e449e2016-12-17 15:15:26 +020045 failed_testnodes = [] # type: List[NodeInfo]
46 failed_nodes = [] # type: List[NodeInfo]
47 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030048
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020049 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020050 if not ok:
51 node = cast(NodeInfo, node)
52 if 'testnode' in node.roles:
53 failed_testnodes.append(node)
54 else:
55 failed_nodes.append(node)
56 else:
57 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020058
koder aka kdanilov39e449e2016-12-17 15:15:26 +020059 if failed_nodes:
60 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020061 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030062
koder aka kdanilov39e449e2016-12-17 15:15:26 +020063 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020064 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030065 logger.error(msg)
66 raise utils.StopTestError(msg)
67
koder aka kdanilov39e449e2016-12-17 15:15:26 +020068 if not failed_nodes:
69 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030070
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030071 def get_time(node):
72 return node.conn.sys.time()
73
74 t_start = time.time()
75 tms = pool.map(get_time, ctx.nodes)
76 t_end = time.time()
77
78 for node, val in zip(ctx.nodes, tms):
79 max_delta = int(max(t_start - val, val - t_end) * 1000)
80 if max_delta > ctx.config.max_time_diff_ms:
81 msg = ("Too large time shift {}ms on node {}. Stopping test." +
82 " Fix time on cluster nodes and restart test, or change " +
83 "max_time_diff_ms(={}ms) setting in config").format(max_delta,
84 str(node),
85 ctx.config.max_time_diff_ms)
86 logger.error(msg)
87 raise StopTestError(msg)
88 if max_delta > 0:
89 logger.warning("Node %s has time shift at least %s ms", node, max_delta)
90
91
koder aka kdanilov39e449e2016-12-17 15:15:26 +020092 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020093 if ctx.config.get("download_rpc_logs", False):
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030094 logger.info("Killing all outstanding processes")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020095 for node in ctx.nodes:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030096 node.conn.cli.killall()
97
98 logger.info("Downloading RPC servers logs")
99 for node in ctx.nodes:
100 node.conn.cli.killall()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200101 if node.rpc_log_file is not None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200102 nid = node.node_id
koder aka kdanilova732a602017-02-01 20:29:56 +0200103 path = "rpc_logs/{}.txt".format(nid)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200104 node.conn.server.flush_logs()
105 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200106 if path in ctx.storage:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200107 ctx.storage.append_raw(log, path)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200108 else:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200109 ctx.storage.put_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200110 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
111
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300112 logger.info("Disconnecting")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200113 with ctx.get_pool() as pool:
114 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +0300115
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +0300116
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200117class CollectInfoStage(Stage):
118 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200119
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200120 priority = StepOrder.START_SENSORS - 2
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200121 config_block = 'collect_info'
122
123 def run(self, ctx: TestRun) -> None:
124 if not ctx.config.collect_info:
125 return
126
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200127 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200128
129 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200130 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200131 for node in ctx.nodes:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200132 nid = node.node_id
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200133 hw_info_path = "hw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200134 if hw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200135 futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200136
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200137 for (path, nid), future in futures.items():
138 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200139 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200140 except Exception:
141 logger.exception("During collecting hardware info from %s", nid)
142 raise utils.StopTestError()
143
144 futures.clear()
145 for node in ctx.nodes:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200146 nid = node.node_id
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200147 sw_info_path = "sw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200148 if sw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200149 futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200150
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200151 for (path, nid), future in futures.items():
152 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200153 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200154 except Exception:
155 logger.exception("During collecting software info from %s", nid)
156 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200157
158
159class ExplicitNodesStage(Stage):
160 """add explicit nodes"""
161
162 priority = StepOrder.DISCOVER
163 config_block = 'nodes'
164
165 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200166 if 'all_nodes' in ctx.storage:
167 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
168 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200169
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200170 for url, roles in ctx.config.get('nodes', {}).raw().items():
kdanylov aka koder150b2192017-04-01 16:53:01 +0300171 ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200172 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200173
174
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200175class SleepStage(Stage):
176 """Save nodes list to file"""
177
178 priority = StepOrder.TEST
179 config_block = 'sleep'
180
181 def run(self, ctx: TestRun) -> None:
182 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300183 stime = time.time()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200184 time.sleep(ctx.config.sleep)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300185 ctx.storage.put([int(stime), int(time.time())], 'idle')
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200186
187
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200188class PrepareNodes(Stage):
189 priority = StepOrder.START_SENSORS - 1
190
191 def __init__(self):
192 Stage.__init__(self)
193 self.nodeepscrub_updated = False
194 self.noscrub_updated = False
195
196 def run(self, ctx: TestRun) -> None:
197 ceph_sett = ctx.config.get('ceph_settings', "").split()
198 if ceph_sett:
199 for node in ctx.nodes:
200 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
201 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
202 if 'noscrub' in ceph_sett:
203 if 'noscrub' in state:
204 logger.debug("noscrub already set on cluster")
205 else:
206 logger.info("Applying noscrub settings to ceph cluster")
207 node.run("ceph osd set noscrub")
208 self.noscrub_updated = True
209
210 if 'nodeepscrub' in ceph_sett:
211 if 'nodeepscrub' in state:
212 logger.debug("noscrub already set on cluster")
213 else:
214 logger.info("Applying noscrub settings to ceph cluster")
215 node.run("ceph osd set noscrub")
216 self.nodeepscrub_updated = True
217 break
218
219 def cleanup(self, ctx: TestRun) -> None:
220 if self.nodeepscrub_updated or self.noscrub_updated:
221 for node in ctx.nodes:
222 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
223 if self.noscrub_updated:
224 logger.info("Reverting noscrub setting for ceph cluster")
225 node.run("ceph osd unset noscrub")
226 self.noscrub_updated = False
227
228 if self.nodeepscrub_updated:
229 logger.info("Reverting noscrub setting for ceph cluster")
230 node.run("ceph osd unset nodeepscrub")
231 self.nodeepscrub_updated = False
232
233
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200234class RunTestsStage(Stage):
235
236 priority = StepOrder.TEST
237 config_block = 'tests'
238
239 def run(self, ctx: TestRun) -> None:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200240 if ctx.config.no_tests:
241 logger.info("Skiping tests, as 'no_tests' config settings is True")
242 return
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200243
koder aka kdanilovf2865172016-12-30 03:35:11 +0200244 for suite_idx, test_suite in enumerate(ctx.config.get('tests', [])):
245 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilovda45e882015-04-06 02:24:42 +0300246
koder aka kdanilovf2865172016-12-30 03:35:11 +0200247 if not test_nodes:
248 logger.error("No test nodes found")
249 raise StopTestError()
gstepanov023c1e42015-04-08 15:50:19 +0300250
koder aka kdanilovf2865172016-12-30 03:35:11 +0200251 if len(test_suite) != 1:
252 logger.error("Test suite %s contain more than one test. Put each test in separated group", suite_idx)
253 raise StopTestError()
koder aka kdanilov70227062016-11-26 23:23:21 +0200254
koder aka kdanilovf2865172016-12-30 03:35:11 +0200255 name, params = list(test_suite.items())[0]
256 vm_count = params.get('node_limit', None) # type: Optional[int]
koder aka kdanilov70227062016-11-26 23:23:21 +0200257
koder aka kdanilovf2865172016-12-30 03:35:11 +0200258 # select test nodes
259 if vm_count is None:
260 curr_test_nodes = test_nodes
261 else:
262 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200263
koder aka kdanilovf2865172016-12-30 03:35:11 +0200264 if not curr_test_nodes:
265 logger.error("No nodes found for test, skipping it.")
266 continue
267
kdanylov aka koder150b2192017-04-01 16:53:01 +0300268 if name not in all_suits:
269 logger.error("Test suite %r not found. Only suits [%s] available", name, ", ".join(all_suits))
270 raise StopTestError()
271
koder aka kdanilov108ac362017-01-19 20:17:16 +0200272 test_cls = all_suits[name]
koder aka kdanilovf2865172016-12-30 03:35:11 +0200273 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
koder aka kdanilova732a602017-02-01 20:29:56 +0200274 suite = SuiteConfig(test_cls.name,
275 params=params,
276 run_uuid=ctx.config.run_uuid,
277 nodes=test_nodes,
278 remote_dir=remote_dir,
279 idx=suite_idx,
280 keep_raw_files=ctx.config.keep_raw_files)
koder aka kdanilovf2865172016-12-30 03:35:11 +0200281
koder aka kdanilov108ac362017-01-19 20:17:16 +0200282 test_cls(storage=ResultStorage(ctx.storage),
283 suite=suite,
koder aka kdanilovf2865172016-12-30 03:35:11 +0200284 on_idle=lambda: collect_sensors_data(ctx, False)).run()
gstepanov023c1e42015-04-08 15:50:19 +0300285
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200286 @classmethod
287 def validate_config(cls, cfg: ConfigBlock) -> None:
288 pass
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200289
290
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300291class SaveNodesStage(Stage):
292 """Save nodes list to file"""
293 nodes_path = 'all_nodes'
294 params_path = 'all_nodes_params.js'
295 priority = StepOrder.UPDATE_NODES_INFO + 1
296
297 def run(self, ctx: TestRun) -> None:
298 infos = list(ctx.nodes_info.values())
299 params = {node.node_id: node.params for node in infos}
300 ninfos = [copy.copy(node) for node in infos]
301 for node in ninfos:
302 node.params = "in {!r} file".format(self.params_path)
303 ctx.storage.put_list(ninfos, self.nodes_path)
304 ctx.storage.put_raw(json.dumps(params).encode('utf8'), self.params_path)
305
306
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200307class LoadStoredNodesStage(Stage):
308 priority = StepOrder.DISCOVER
309
310 def run(self, ctx: TestRun) -> None:
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300311 if SaveNodesStage.nodes_path in ctx.storage:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200312 if ctx.nodes_info:
313 logger.error("Internal error: Some nodes already stored in " +
314 "nodes_info before LoadStoredNodesStage stage")
315 raise StopTestError()
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300316
317 nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
318
319 if SaveNodesStage.params_path in ctx.storage:
320 params = json.loads(ctx.storage.get_raw(SaveNodesStage.params_path).decode('utf8'))
321 for node_id, node in nodes.items():
322 node.params = params.get(node_id, {})
323
324 ctx.nodes_info = nodes
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200325 logger.info("%s nodes loaded from database", len(ctx.nodes_info))