blob: 578a65b75b496b0f33044a81a6bd771acbd3dc4a [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
kdanylov aka koder736e5c12017-05-07 17:27:14 +03003import copy
koder aka kdanilove21d7472015-02-14 19:02:04 -08004import logging
kdanylov aka koder84de1e42017-05-22 14:00:07 +03005from typing import List, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03006
kdanylov aka koder026e5f22017-05-15 01:04:39 +03007from cephlib.wally_storage import WallyDB
kdanylov aka koder84de1e42017-05-22 14:00:07 +03008from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info, get_hostname
kdanylov aka koder026e5f22017-05-15 01:04:39 +03009from cephlib.ssh import parse_ssh_uri
10from cephlib.node_impl import setup_rpc, connect
11
12from . import utils
koder aka kdanilov39e449e2016-12-17 15:15:26 +020013from .config import ConfigBlock
koder aka kdanilov39e449e2016-12-17 15:15:26 +020014from .stage import Stage, StepOrder
koder aka kdanilov7f59d562016-12-26 01:34:23 +020015from .sensors import collect_sensors_data
koder aka kdanilov108ac362017-01-19 20:17:16 +020016from .suits.all_suits import all_suits
koder aka kdanilov39e449e2016-12-17 15:15:26 +020017from .test_run_class import TestRun
koder aka kdanilova732a602017-02-01 20:29:56 +020018from .result_classes import SuiteConfig
koder aka kdanilov63ad2062015-04-27 13:11:40 +030019
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030020
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030021logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030022
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080023
koder aka kdanilov39e449e2016-12-17 15:15:26 +020024class ConnectStage(Stage):
25 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080026
koder aka kdanilov39e449e2016-12-17 15:15:26 +020027 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030028
koder aka kdanilov39e449e2016-12-17 15:15:26 +020029 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020030 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020031 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030032
koder aka kdanilov39e449e2016-12-17 15:15:26 +020033 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
34 try:
35 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020036
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020037 return True, setup_rpc(ssh_node,
38 ctx.rpc_code,
39 ctx.default_rpc_plugins,
40 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020041 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020042 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020043 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030044
koder aka kdanilov39e449e2016-12-17 15:15:26 +020045 failed_testnodes = [] # type: List[NodeInfo]
46 failed_nodes = [] # type: List[NodeInfo]
47 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030048
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020049 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020050 if not ok:
51 node = cast(NodeInfo, node)
52 if 'testnode' in node.roles:
53 failed_testnodes.append(node)
54 else:
55 failed_nodes.append(node)
56 else:
57 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020058
koder aka kdanilov39e449e2016-12-17 15:15:26 +020059 if failed_nodes:
60 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020061 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030062
koder aka kdanilov39e449e2016-12-17 15:15:26 +020063 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020064 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030065 logger.error(msg)
66 raise utils.StopTestError(msg)
67
koder aka kdanilov39e449e2016-12-17 15:15:26 +020068 if not failed_nodes:
69 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030070
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030071 def get_time(node):
72 return node.conn.sys.time()
73
74 t_start = time.time()
75 tms = pool.map(get_time, ctx.nodes)
76 t_end = time.time()
77
78 for node, val in zip(ctx.nodes, tms):
kdanylov aka koderb0833332017-05-13 20:39:17 +030079 delta = 0
80 if val > t_end:
81 delta = val - t_end
kdanylov aka koder84de1e42017-05-22 14:00:07 +030082 elif val < t_start:
kdanylov aka koderb0833332017-05-13 20:39:17 +030083 delta = t_start - val
84
85 if delta > ctx.config.max_time_diff_ms:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030086 msg = ("Too large time shift {}ms on node {}. Stopping test." +
87 " Fix time on cluster nodes and restart test, or change " +
kdanylov aka koderb0833332017-05-13 20:39:17 +030088 "max_time_diff_ms(={}ms) setting in config").format(delta,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030089 str(node),
90 ctx.config.max_time_diff_ms)
91 logger.error(msg)
kdanylov aka koderb0833332017-05-13 20:39:17 +030092 raise utils.StopTestError(msg)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030093
kdanylov aka koder84de1e42017-05-22 14:00:07 +030094 if delta > 1:
95 logger.warning("Node %s has time shift at least %s ms", node, int(delta))
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030096
koder aka kdanilov39e449e2016-12-17 15:15:26 +020097 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020098 if ctx.config.get("download_rpc_logs", False):
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030099 logger.info("Killing all outstanding processes")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200100 for node in ctx.nodes:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300101 node.conn.cli.killall()
102
103 logger.info("Downloading RPC servers logs")
104 for node in ctx.nodes:
105 node.conn.cli.killall()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200106 if node.rpc_log_file is not None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200107 nid = node.node_id
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300108 path = WallyDB.rpc_logs.format(node_id=nid)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200109 node.conn.server.flush_logs()
110 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200111 if path in ctx.storage:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200112 ctx.storage.append_raw(log, path)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200113 else:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200114 ctx.storage.put_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200115 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
116
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300117 logger.info("Disconnecting")
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200118 with ctx.get_pool() as pool:
119 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +0300120
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +0300121
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200122class CollectInfoStage(Stage):
123 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200124
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300125 priority = StepOrder.UPDATE_NODES_INFO
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200126 config_block = 'collect_info'
127
128 def run(self, ctx: TestRun) -> None:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200129 with ctx.get_pool() as pool:
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300130 try:
131 # can't make next RPC request until finish with previous for same node
132 for node, hw_info in zip(ctx.nodes, pool.map(get_hw_info, ctx.nodes)):
133 node.info.hw_info = hw_info
134 for node, sw_info in zip(ctx.nodes, pool.map(get_sw_info, ctx.nodes)):
135 node.info.sw_info = sw_info
136 except Exception as exc:
137 logger.exception("During collecting cluster info")
138 raise utils.StopTestError() from exc
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200139
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300140 logger.debug("Collecting hostnames")
141 hostnames = pool.map(get_hostname, ctx.nodes)
142 for node, hostname in zip(ctx.nodes, hostnames):
143 node.info.hostname = hostname
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200144
145
146class ExplicitNodesStage(Stage):
147 """add explicit nodes"""
148
149 priority = StepOrder.DISCOVER
150 config_block = 'nodes'
151
152 def run(self, ctx: TestRun) -> None:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300153 if WallyDB.all_nodes in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200154 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
155 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200156
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200157 for url, roles in ctx.config.get('nodes', {}).raw().items():
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300158 ctx.merge_node(parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200159 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200160
161
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200162class SleepStage(Stage):
163 """Save nodes list to file"""
164
165 priority = StepOrder.TEST
166 config_block = 'sleep'
167
168 def run(self, ctx: TestRun) -> None:
169 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300170 stime = time.time()
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200171 time.sleep(ctx.config.sleep)
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300172 ctx.storage.put([int(stime), int(time.time())], 'idle')
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200173
174
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200175class PrepareNodes(Stage):
176 priority = StepOrder.START_SENSORS - 1
177
178 def __init__(self):
179 Stage.__init__(self)
180 self.nodeepscrub_updated = False
181 self.noscrub_updated = False
182
183 def run(self, ctx: TestRun) -> None:
184 ceph_sett = ctx.config.get('ceph_settings', "").split()
185 if ceph_sett:
186 for node in ctx.nodes:
187 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
188 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
189 if 'noscrub' in ceph_sett:
190 if 'noscrub' in state:
191 logger.debug("noscrub already set on cluster")
192 else:
193 logger.info("Applying noscrub settings to ceph cluster")
194 node.run("ceph osd set noscrub")
195 self.noscrub_updated = True
196
197 if 'nodeepscrub' in ceph_sett:
198 if 'nodeepscrub' in state:
199 logger.debug("noscrub already set on cluster")
200 else:
201 logger.info("Applying noscrub settings to ceph cluster")
202 node.run("ceph osd set noscrub")
203 self.nodeepscrub_updated = True
204 break
205
206 def cleanup(self, ctx: TestRun) -> None:
207 if self.nodeepscrub_updated or self.noscrub_updated:
208 for node in ctx.nodes:
209 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
210 if self.noscrub_updated:
211 logger.info("Reverting noscrub setting for ceph cluster")
212 node.run("ceph osd unset noscrub")
213 self.noscrub_updated = False
214
215 if self.nodeepscrub_updated:
216 logger.info("Reverting noscrub setting for ceph cluster")
217 node.run("ceph osd unset nodeepscrub")
218 self.nodeepscrub_updated = False
219
220
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200221class RunTestsStage(Stage):
222
223 priority = StepOrder.TEST
224 config_block = 'tests'
225
226 def run(self, ctx: TestRun) -> None:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200227 if ctx.config.no_tests:
228 logger.info("Skiping tests, as 'no_tests' config settings is True")
229 return
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200230
koder aka kdanilovf2865172016-12-30 03:35:11 +0200231 for suite_idx, test_suite in enumerate(ctx.config.get('tests', [])):
232 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilovda45e882015-04-06 02:24:42 +0300233
koder aka kdanilovf2865172016-12-30 03:35:11 +0200234 if not test_nodes:
235 logger.error("No test nodes found")
kdanylov aka koderb0833332017-05-13 20:39:17 +0300236 raise utils.StopTestError()
gstepanov023c1e42015-04-08 15:50:19 +0300237
koder aka kdanilovf2865172016-12-30 03:35:11 +0200238 if len(test_suite) != 1:
239 logger.error("Test suite %s contain more than one test. Put each test in separated group", suite_idx)
kdanylov aka koderb0833332017-05-13 20:39:17 +0300240 raise utils.StopTestError()
koder aka kdanilov70227062016-11-26 23:23:21 +0200241
koder aka kdanilovf2865172016-12-30 03:35:11 +0200242 name, params = list(test_suite.items())[0]
243 vm_count = params.get('node_limit', None) # type: Optional[int]
koder aka kdanilov70227062016-11-26 23:23:21 +0200244
koder aka kdanilovf2865172016-12-30 03:35:11 +0200245 # select test nodes
246 if vm_count is None:
247 curr_test_nodes = test_nodes
248 else:
249 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200250
koder aka kdanilovf2865172016-12-30 03:35:11 +0200251 if not curr_test_nodes:
252 logger.error("No nodes found for test, skipping it.")
253 continue
254
kdanylov aka koder150b2192017-04-01 16:53:01 +0300255 if name not in all_suits:
256 logger.error("Test suite %r not found. Only suits [%s] available", name, ", ".join(all_suits))
kdanylov aka koderb0833332017-05-13 20:39:17 +0300257 raise utils.StopTestError()
kdanylov aka koder150b2192017-04-01 16:53:01 +0300258
koder aka kdanilov108ac362017-01-19 20:17:16 +0200259 test_cls = all_suits[name]
koder aka kdanilovf2865172016-12-30 03:35:11 +0200260 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
koder aka kdanilova732a602017-02-01 20:29:56 +0200261 suite = SuiteConfig(test_cls.name,
262 params=params,
263 run_uuid=ctx.config.run_uuid,
264 nodes=test_nodes,
265 remote_dir=remote_dir,
266 idx=suite_idx,
267 keep_raw_files=ctx.config.keep_raw_files)
koder aka kdanilovf2865172016-12-30 03:35:11 +0200268
kdanylov aka koderb0833332017-05-13 20:39:17 +0300269 test_cls(storage=ctx.rstorage,
koder aka kdanilov108ac362017-01-19 20:17:16 +0200270 suite=suite,
koder aka kdanilovf2865172016-12-30 03:35:11 +0200271 on_idle=lambda: collect_sensors_data(ctx, False)).run()
gstepanov023c1e42015-04-08 15:50:19 +0300272
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200273 @classmethod
274 def validate_config(cls, cfg: ConfigBlock) -> None:
275 pass
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200276
277
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300278class SaveNodesStage(Stage):
279 """Save nodes list to file"""
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300280 priority = StepOrder.UPDATE_NODES_INFO + 1
281
282 def run(self, ctx: TestRun) -> None:
283 infos = list(ctx.nodes_info.values())
284 params = {node.node_id: node.params for node in infos}
285 ninfos = [copy.copy(node) for node in infos]
286 for node in ninfos:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300287 node.params = {"in file": WallyDB.nodes_params}
288 ctx.storage.put_list(ninfos, WallyDB.all_nodes)
289 ctx.storage.put_raw(json.dumps(params).encode('utf8'), WallyDB.nodes_params)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300290
291
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200292class LoadStoredNodesStage(Stage):
293 priority = StepOrder.DISCOVER
294
295 def run(self, ctx: TestRun) -> None:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300296 if WallyDB.all_nodes in ctx.storage:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200297 if ctx.nodes_info:
298 logger.error("Internal error: Some nodes already stored in " +
299 "nodes_info before LoadStoredNodesStage stage")
kdanylov aka koderb0833332017-05-13 20:39:17 +0300300 raise utils.StopTestError()
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300301
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300302 ctx.nodes_info = {node.node_id: node for node in ctx.rstorage.load_nodes()}
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200303 logger.info("%s nodes loaded from database", len(ctx.nodes_info))