blob: 6fbefc4800d58555f53eaef3d85e750c1427117d [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
kdanylov aka koder736e5c12017-05-07 17:27:14 +03003import copy
koder aka kdanilove21d7472015-02-14 19:02:04 -08004import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02005from concurrent.futures import Future
6from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03007
kdanylov aka koder026e5f22017-05-15 01:04:39 +03008from cephlib.wally_storage import WallyDB
9from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
10from cephlib.ssh import parse_ssh_uri
11from cephlib.node_impl import setup_rpc, connect
12
13from . import utils
koder aka kdanilov39e449e2016-12-17 15:15:26 +020014from .config import ConfigBlock
koder aka kdanilov39e449e2016-12-17 15:15:26 +020015from .stage import Stage, StepOrder
koder aka kdanilov7f59d562016-12-26 01:34:23 +020016from .sensors import collect_sensors_data
koder aka kdanilov108ac362017-01-19 20:17:16 +020017from .suits.all_suits import all_suits
koder aka kdanilov39e449e2016-12-17 15:15:26 +020018from .test_run_class import TestRun
koder aka kdanilova732a602017-02-01 20:29:56 +020019from .result_classes import SuiteConfig
koder aka kdanilov63ad2062015-04-27 13:11:40 +030020
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030021
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030022logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030023
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080024
koder aka kdanilov39e449e2016-12-17 15:15:26 +020025class ConnectStage(Stage):
26 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080027
koder aka kdanilov39e449e2016-12-17 15:15:26 +020028 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030029
koder aka kdanilov39e449e2016-12-17 15:15:26 +020030 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020031 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020032 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030033
koder aka kdanilov39e449e2016-12-17 15:15:26 +020034 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
35 try:
36 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020037
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020038 return True, setup_rpc(ssh_node,
39 ctx.rpc_code,
40 ctx.default_rpc_plugins,
41 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020042 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020043 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020044 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030045
koder aka kdanilov39e449e2016-12-17 15:15:26 +020046 failed_testnodes = [] # type: List[NodeInfo]
47 failed_nodes = [] # type: List[NodeInfo]
48 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030049
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020050 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020051 if not ok:
52 node = cast(NodeInfo, node)
53 if 'testnode' in node.roles:
54 failed_testnodes.append(node)
55 else:
56 failed_nodes.append(node)
57 else:
58 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020059
koder aka kdanilov39e449e2016-12-17 15:15:26 +020060 if failed_nodes:
61 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020062 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030063
koder aka kdanilov39e449e2016-12-17 15:15:26 +020064 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020065 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030066 logger.error(msg)
67 raise utils.StopTestError(msg)
68
koder aka kdanilov39e449e2016-12-17 15:15:26 +020069 if not failed_nodes:
70 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030071
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030072 def get_time(node):
73 return node.conn.sys.time()
74
75 t_start = time.time()
76 tms = pool.map(get_time, ctx.nodes)
77 t_end = time.time()
78
79 for node, val in zip(ctx.nodes, tms):
kdanylov aka koderb0833332017-05-13 20:39:17 +030080 delta = 0
81 if val > t_end:
82 delta = val - t_end
83 elif t_start > val:
84 delta = t_start - val
85
86 if delta > ctx.config.max_time_diff_ms:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030087 msg = ("Too large time shift {}ms on node {}. Stopping test." +
88 " Fix time on cluster nodes and restart test, or change " +
kdanylov aka koderb0833332017-05-13 20:39:17 +030089 "max_time_diff_ms(={}ms) setting in config").format(delta,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030090 str(node),
91 ctx.config.max_time_diff_ms)
92 logger.error(msg)
kdanylov aka koderb0833332017-05-13 20:39:17 +030093 raise utils.StopTestError(msg)
94 if delta > 0:
95 logger.warning("Node %s has time shift at least %s ms", node, delta)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030096
97
koder aka kdanilov39e449e2016-12-17 15:15:26 +020098 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020099 if ctx.config.get("download_rpc_logs", False):
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300100 logger.info("Killing all outstanding processes")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200101 for node in ctx.nodes:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300102 node.conn.cli.killall()
103
104 logger.info("Downloading RPC servers logs")
105 for node in ctx.nodes:
106 node.conn.cli.killall()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200107 if node.rpc_log_file is not None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200108 nid = node.node_id
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300109 path = WallyDB.rpc_logs.format(node_id=nid)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200110 node.conn.server.flush_logs()
111 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200112 if path in ctx.storage:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200113 ctx.storage.append_raw(log, path)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200114 else:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200115 ctx.storage.put_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200116 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
117
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300118 logger.info("Disconnecting")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200119 with ctx.get_pool() as pool:
120 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +0300121
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +0300122
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200123class CollectInfoStage(Stage):
124 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200125
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200126 priority = StepOrder.START_SENSORS - 2
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200127 config_block = 'collect_info'
128
129 def run(self, ctx: TestRun) -> None:
130 if not ctx.config.collect_info:
131 return
132
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200133 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200134
135 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200136 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200137 for node in ctx.nodes:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200138 nid = node.node_id
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300139 hw_info_path = WallyDB.hw_info.format(node_id=nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200140 if hw_info_path not in ctx.storage:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300141 futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200142
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200143 for (path, nid), future in futures.items():
144 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200145 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200146 except Exception:
147 logger.exception("During collecting hardware info from %s", nid)
148 raise utils.StopTestError()
149
150 futures.clear()
151 for node in ctx.nodes:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200152 nid = node.node_id
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300153 sw_info_path = WallyDB.sw_info.format(node_id=nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200154 if sw_info_path not in ctx.storage:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300155 futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200156
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200157 for (path, nid), future in futures.items():
158 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200159 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200160 except Exception:
161 logger.exception("During collecting software info from %s", nid)
162 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200163
164
165class ExplicitNodesStage(Stage):
166 """add explicit nodes"""
167
168 priority = StepOrder.DISCOVER
169 config_block = 'nodes'
170
171 def run(self, ctx: TestRun) -> None:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300172 if WallyDB.all_nodes in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200173 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
174 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200175
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200176 for url, roles in ctx.config.get('nodes', {}).raw().items():
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300177 ctx.merge_node(parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200178 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200179
180
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200181class SleepStage(Stage):
182 """Save nodes list to file"""
183
184 priority = StepOrder.TEST
185 config_block = 'sleep'
186
187 def run(self, ctx: TestRun) -> None:
188 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300189 stime = time.time()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200190 time.sleep(ctx.config.sleep)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300191 ctx.storage.put([int(stime), int(time.time())], 'idle')
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200192
193
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200194class PrepareNodes(Stage):
195 priority = StepOrder.START_SENSORS - 1
196
197 def __init__(self):
198 Stage.__init__(self)
199 self.nodeepscrub_updated = False
200 self.noscrub_updated = False
201
202 def run(self, ctx: TestRun) -> None:
203 ceph_sett = ctx.config.get('ceph_settings', "").split()
204 if ceph_sett:
205 for node in ctx.nodes:
206 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
207 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
208 if 'noscrub' in ceph_sett:
209 if 'noscrub' in state:
210 logger.debug("noscrub already set on cluster")
211 else:
212 logger.info("Applying noscrub settings to ceph cluster")
213 node.run("ceph osd set noscrub")
214 self.noscrub_updated = True
215
216 if 'nodeepscrub' in ceph_sett:
217 if 'nodeepscrub' in state:
218 logger.debug("noscrub already set on cluster")
219 else:
220 logger.info("Applying noscrub settings to ceph cluster")
221 node.run("ceph osd set noscrub")
222 self.nodeepscrub_updated = True
223 break
224
225 def cleanup(self, ctx: TestRun) -> None:
226 if self.nodeepscrub_updated or self.noscrub_updated:
227 for node in ctx.nodes:
228 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
229 if self.noscrub_updated:
230 logger.info("Reverting noscrub setting for ceph cluster")
231 node.run("ceph osd unset noscrub")
232 self.noscrub_updated = False
233
234 if self.nodeepscrub_updated:
235 logger.info("Reverting noscrub setting for ceph cluster")
236 node.run("ceph osd unset nodeepscrub")
237 self.nodeepscrub_updated = False
238
239
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200240class RunTestsStage(Stage):
241
242 priority = StepOrder.TEST
243 config_block = 'tests'
244
245 def run(self, ctx: TestRun) -> None:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200246 if ctx.config.no_tests:
247 logger.info("Skiping tests, as 'no_tests' config settings is True")
248 return
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200249
koder aka kdanilovf2865172016-12-30 03:35:11 +0200250 for suite_idx, test_suite in enumerate(ctx.config.get('tests', [])):
251 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilovda45e882015-04-06 02:24:42 +0300252
koder aka kdanilovf2865172016-12-30 03:35:11 +0200253 if not test_nodes:
254 logger.error("No test nodes found")
kdanylov aka koderb0833332017-05-13 20:39:17 +0300255 raise utils.StopTestError()
gstepanov023c1e42015-04-08 15:50:19 +0300256
koder aka kdanilovf2865172016-12-30 03:35:11 +0200257 if len(test_suite) != 1:
258 logger.error("Test suite %s contain more than one test. Put each test in separated group", suite_idx)
kdanylov aka koderb0833332017-05-13 20:39:17 +0300259 raise utils.StopTestError()
koder aka kdanilov70227062016-11-26 23:23:21 +0200260
koder aka kdanilovf2865172016-12-30 03:35:11 +0200261 name, params = list(test_suite.items())[0]
262 vm_count = params.get('node_limit', None) # type: Optional[int]
koder aka kdanilov70227062016-11-26 23:23:21 +0200263
koder aka kdanilovf2865172016-12-30 03:35:11 +0200264 # select test nodes
265 if vm_count is None:
266 curr_test_nodes = test_nodes
267 else:
268 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200269
koder aka kdanilovf2865172016-12-30 03:35:11 +0200270 if not curr_test_nodes:
271 logger.error("No nodes found for test, skipping it.")
272 continue
273
kdanylov aka koder150b2192017-04-01 16:53:01 +0300274 if name not in all_suits:
275 logger.error("Test suite %r not found. Only suits [%s] available", name, ", ".join(all_suits))
kdanylov aka koderb0833332017-05-13 20:39:17 +0300276 raise utils.StopTestError()
kdanylov aka koder150b2192017-04-01 16:53:01 +0300277
koder aka kdanilov108ac362017-01-19 20:17:16 +0200278 test_cls = all_suits[name]
koder aka kdanilovf2865172016-12-30 03:35:11 +0200279 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
koder aka kdanilova732a602017-02-01 20:29:56 +0200280 suite = SuiteConfig(test_cls.name,
281 params=params,
282 run_uuid=ctx.config.run_uuid,
283 nodes=test_nodes,
284 remote_dir=remote_dir,
285 idx=suite_idx,
286 keep_raw_files=ctx.config.keep_raw_files)
koder aka kdanilovf2865172016-12-30 03:35:11 +0200287
kdanylov aka koderb0833332017-05-13 20:39:17 +0300288 test_cls(storage=ctx.rstorage,
koder aka kdanilov108ac362017-01-19 20:17:16 +0200289 suite=suite,
koder aka kdanilovf2865172016-12-30 03:35:11 +0200290 on_idle=lambda: collect_sensors_data(ctx, False)).run()
gstepanov023c1e42015-04-08 15:50:19 +0300291
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200292 @classmethod
293 def validate_config(cls, cfg: ConfigBlock) -> None:
294 pass
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200295
296
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300297class SaveNodesStage(Stage):
298 """Save nodes list to file"""
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300299 priority = StepOrder.UPDATE_NODES_INFO + 1
300
301 def run(self, ctx: TestRun) -> None:
302 infos = list(ctx.nodes_info.values())
303 params = {node.node_id: node.params for node in infos}
304 ninfos = [copy.copy(node) for node in infos]
305 for node in ninfos:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300306 node.params = {"in file": WallyDB.nodes_params}
307 ctx.storage.put_list(ninfos, WallyDB.all_nodes)
308 ctx.storage.put_raw(json.dumps(params).encode('utf8'), WallyDB.nodes_params)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300309
310
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200311class LoadStoredNodesStage(Stage):
312 priority = StepOrder.DISCOVER
313
314 def run(self, ctx: TestRun) -> None:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300315 if WallyDB.all_nodes in ctx.storage:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200316 if ctx.nodes_info:
317 logger.error("Internal error: Some nodes already stored in " +
318 "nodes_info before LoadStoredNodesStage stage")
kdanylov aka koderb0833332017-05-13 20:39:17 +0300319 raise utils.StopTestError()
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300320
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300321 ctx.nodes_info = {node.node_id: node for node in ctx.rstorage.load_nodes()}
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200322 logger.info("%s nodes loaded from database", len(ctx.nodes_info))