blob: d5f7d19f9e12791b3b617582468614a6c1bee0c6 [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02004from concurrent.futures import Future
5from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03006
koder aka kdanilov39e449e2016-12-17 15:15:26 +02007from . import utils, ssh_utils, hw_info
8from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +02009from .node import setup_rpc, connect
koder aka kdanilov7f59d562016-12-26 01:34:23 +020010from .node_interfaces import NodeInfo, IRPCNode
koder aka kdanilov39e449e2016-12-17 15:15:26 +020011from .stage import Stage, StepOrder
koder aka kdanilov7f59d562016-12-26 01:34:23 +020012from .sensors import collect_sensors_data
koder aka kdanilov108ac362017-01-19 20:17:16 +020013from .suits.all_suits import all_suits
koder aka kdanilov39e449e2016-12-17 15:15:26 +020014from .test_run_class import TestRun
koder aka kdanilov7f59d562016-12-26 01:34:23 +020015from .utils import StopTestError
koder aka kdanilova732a602017-02-01 20:29:56 +020016from .result_classes import SuiteConfig
koder aka kdanilov108ac362017-01-19 20:17:16 +020017from .hlstorage import ResultStorage
koder aka kdanilov63ad2062015-04-27 13:11:40 +030018
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030019
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030020logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030021
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080022
koder aka kdanilov39e449e2016-12-17 15:15:26 +020023class ConnectStage(Stage):
24 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080025
koder aka kdanilov39e449e2016-12-17 15:15:26 +020026 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030027
koder aka kdanilov39e449e2016-12-17 15:15:26 +020028 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020029 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020030 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030031
koder aka kdanilov39e449e2016-12-17 15:15:26 +020032 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
33 try:
34 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020035
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020036 return True, setup_rpc(ssh_node,
37 ctx.rpc_code,
38 ctx.default_rpc_plugins,
39 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020040 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020041 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020042 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030043
koder aka kdanilov39e449e2016-12-17 15:15:26 +020044 failed_testnodes = [] # type: List[NodeInfo]
45 failed_nodes = [] # type: List[NodeInfo]
46 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030047
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020048 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020049 if not ok:
50 node = cast(NodeInfo, node)
51 if 'testnode' in node.roles:
52 failed_testnodes.append(node)
53 else:
54 failed_nodes.append(node)
55 else:
56 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020057
koder aka kdanilov39e449e2016-12-17 15:15:26 +020058 if failed_nodes:
59 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020060 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030061
koder aka kdanilov39e449e2016-12-17 15:15:26 +020062 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020063 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030064 logger.error(msg)
65 raise utils.StopTestError(msg)
66
koder aka kdanilov39e449e2016-12-17 15:15:26 +020067 if not failed_nodes:
68 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030069
koder aka kdanilov39e449e2016-12-17 15:15:26 +020070 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020071 if ctx.config.get("download_rpc_logs", False):
72 for node in ctx.nodes:
73 if node.rpc_log_file is not None:
koder aka kdanilov108ac362017-01-19 20:17:16 +020074 nid = node.node_id
koder aka kdanilova732a602017-02-01 20:29:56 +020075 path = "rpc_logs/{}.txt".format(nid)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020076 node.conn.server.flush_logs()
77 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020078 if path in ctx.storage:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +020079 ctx.storage.append_raw(log, path)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020080 else:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +020081 ctx.storage.put_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020082 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
83
84 with ctx.get_pool() as pool:
85 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +030086
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030087
koder aka kdanilov39e449e2016-12-17 15:15:26 +020088class CollectInfoStage(Stage):
89 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020090
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020091 priority = StepOrder.START_SENSORS - 2
koder aka kdanilov39e449e2016-12-17 15:15:26 +020092 config_block = 'collect_info'
93
94 def run(self, ctx: TestRun) -> None:
95 if not ctx.config.collect_info:
96 return
97
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020098 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +020099
100 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200101 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200102 for node in ctx.nodes:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200103 nid = node.node_id
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200104 hw_info_path = "hw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200105 if hw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200106 futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200107
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200108 for (path, nid), future in futures.items():
109 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200110 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200111 except Exception:
112 logger.exception("During collecting hardware info from %s", nid)
113 raise utils.StopTestError()
114
115 futures.clear()
116 for node in ctx.nodes:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200117 nid = node.node_id
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200118 sw_info_path = "sw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200119 if sw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200120 futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200121
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200122 for (path, nid), future in futures.items():
123 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200124 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200125 except Exception:
126 logger.exception("During collecting software info from %s", nid)
127 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200128
129
130class ExplicitNodesStage(Stage):
131 """add explicit nodes"""
132
133 priority = StepOrder.DISCOVER
134 config_block = 'nodes'
135
136 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200137 if 'all_nodes' in ctx.storage:
138 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
139 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200140
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200141 for url, roles in ctx.config.get('nodes', {}).raw().items():
kdanylov aka koder150b2192017-04-01 16:53:01 +0300142 ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200143 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200144
145
146class SaveNodesStage(Stage):
147 """Save nodes list to file"""
148
149 priority = StepOrder.CONNECT
150
151 def run(self, ctx: TestRun) -> None:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200152 ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200153
154
155class SleepStage(Stage):
156 """Save nodes list to file"""
157
158 priority = StepOrder.TEST
159 config_block = 'sleep'
160
161 def run(self, ctx: TestRun) -> None:
162 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
163 time.sleep(ctx.config.sleep)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200164
165
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200166class PrepareNodes(Stage):
167 priority = StepOrder.START_SENSORS - 1
168
169 def __init__(self):
170 Stage.__init__(self)
171 self.nodeepscrub_updated = False
172 self.noscrub_updated = False
173
174 def run(self, ctx: TestRun) -> None:
175 ceph_sett = ctx.config.get('ceph_settings', "").split()
176 if ceph_sett:
177 for node in ctx.nodes:
178 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
179 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
180 if 'noscrub' in ceph_sett:
181 if 'noscrub' in state:
182 logger.debug("noscrub already set on cluster")
183 else:
184 logger.info("Applying noscrub settings to ceph cluster")
185 node.run("ceph osd set noscrub")
186 self.noscrub_updated = True
187
188 if 'nodeepscrub' in ceph_sett:
189 if 'nodeepscrub' in state:
190 logger.debug("noscrub already set on cluster")
191 else:
192 logger.info("Applying noscrub settings to ceph cluster")
193 node.run("ceph osd set noscrub")
194 self.nodeepscrub_updated = True
195 break
196
197 def cleanup(self, ctx: TestRun) -> None:
198 if self.nodeepscrub_updated or self.noscrub_updated:
199 for node in ctx.nodes:
200 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
201 if self.noscrub_updated:
202 logger.info("Reverting noscrub setting for ceph cluster")
203 node.run("ceph osd unset noscrub")
204 self.noscrub_updated = False
205
206 if self.nodeepscrub_updated:
207 logger.info("Reverting noscrub setting for ceph cluster")
208 node.run("ceph osd unset nodeepscrub")
209 self.nodeepscrub_updated = False
210
211
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200212class RunTestsStage(Stage):
213
214 priority = StepOrder.TEST
215 config_block = 'tests'
216
217 def run(self, ctx: TestRun) -> None:
koder aka kdanilovf2865172016-12-30 03:35:11 +0200218 if ctx.config.no_tests:
219 logger.info("Skiping tests, as 'no_tests' config settings is True")
220 return
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200221
koder aka kdanilovf2865172016-12-30 03:35:11 +0200222 for suite_idx, test_suite in enumerate(ctx.config.get('tests', [])):
223 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilovda45e882015-04-06 02:24:42 +0300224
koder aka kdanilovf2865172016-12-30 03:35:11 +0200225 if not test_nodes:
226 logger.error("No test nodes found")
227 raise StopTestError()
gstepanov023c1e42015-04-08 15:50:19 +0300228
koder aka kdanilovf2865172016-12-30 03:35:11 +0200229 if len(test_suite) != 1:
230 logger.error("Test suite %s contain more than one test. Put each test in separated group", suite_idx)
231 raise StopTestError()
koder aka kdanilov70227062016-11-26 23:23:21 +0200232
koder aka kdanilovf2865172016-12-30 03:35:11 +0200233 name, params = list(test_suite.items())[0]
234 vm_count = params.get('node_limit', None) # type: Optional[int]
koder aka kdanilov70227062016-11-26 23:23:21 +0200235
koder aka kdanilovf2865172016-12-30 03:35:11 +0200236 # select test nodes
237 if vm_count is None:
238 curr_test_nodes = test_nodes
239 else:
240 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200241
koder aka kdanilovf2865172016-12-30 03:35:11 +0200242 if not curr_test_nodes:
243 logger.error("No nodes found for test, skipping it.")
244 continue
245
kdanylov aka koder150b2192017-04-01 16:53:01 +0300246 if name not in all_suits:
247 logger.error("Test suite %r not found. Only suits [%s] available", name, ", ".join(all_suits))
248 raise StopTestError()
249
koder aka kdanilov108ac362017-01-19 20:17:16 +0200250 test_cls = all_suits[name]
koder aka kdanilovf2865172016-12-30 03:35:11 +0200251 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
koder aka kdanilova732a602017-02-01 20:29:56 +0200252 suite = SuiteConfig(test_cls.name,
253 params=params,
254 run_uuid=ctx.config.run_uuid,
255 nodes=test_nodes,
256 remote_dir=remote_dir,
257 idx=suite_idx,
258 keep_raw_files=ctx.config.keep_raw_files)
koder aka kdanilovf2865172016-12-30 03:35:11 +0200259
koder aka kdanilov108ac362017-01-19 20:17:16 +0200260 test_cls(storage=ResultStorage(ctx.storage),
261 suite=suite,
koder aka kdanilovf2865172016-12-30 03:35:11 +0200262 on_idle=lambda: collect_sensors_data(ctx, False)).run()
gstepanov023c1e42015-04-08 15:50:19 +0300263
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200264 @classmethod
265 def validate_config(cls, cfg: ConfigBlock) -> None:
266 pass
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200267
268
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200269class LoadStoredNodesStage(Stage):
270 priority = StepOrder.DISCOVER
271
272 def run(self, ctx: TestRun) -> None:
273 if 'all_nodes' in ctx.storage:
274 if ctx.nodes_info:
275 logger.error("Internal error: Some nodes already stored in " +
276 "nodes_info before LoadStoredNodesStage stage")
277 raise StopTestError()
koder aka kdanilov108ac362017-01-19 20:17:16 +0200278 ctx.nodes_info = {node.node_id: node
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200279 for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
280 logger.info("%s nodes loaded from database", len(ctx.nodes_info))