blob: 52803d15bce5434e781b1d30e85aa72ced2435ba [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02004from concurrent.futures import Future
5from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03006
koder aka kdanilov39e449e2016-12-17 15:15:26 +02007from . import utils, ssh_utils, hw_info
8from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +02009from .node import setup_rpc, connect
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020010from .node_interfaces import NodeInfo, IRPCNode, ISSHHost
koder aka kdanilov39e449e2016-12-17 15:15:26 +020011from .stage import Stage, StepOrder
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030012from .suits.io.fio import IOPerfTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020013from .suits.itest import TestInputConfig
14from .suits.mysql import MysqlTest
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030015from .suits.omgbench import OmgTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020016from .suits.postgres import PgBenchTest
17from .test_run_class import TestRun
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030018
19
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030020TOOL_TYPE_MAPPER = {
21 "io": IOPerfTest,
22 "pgbench": PgBenchTest,
23 "mysql": MysqlTest,
Yulia Portnovab0c977c2015-12-11 19:23:28 +020024 "omg": OmgTest,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030025}
koder aka kdanilov63ad2062015-04-27 13:11:40 +030026
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030027
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030028logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030029
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080030
koder aka kdanilov39e449e2016-12-17 15:15:26 +020031class ConnectStage(Stage):
32 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080033
koder aka kdanilov39e449e2016-12-17 15:15:26 +020034 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030035
koder aka kdanilov39e449e2016-12-17 15:15:26 +020036 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020037 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020038 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030039
koder aka kdanilov39e449e2016-12-17 15:15:26 +020040 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
41 try:
42 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020043
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020044 return True, setup_rpc(ssh_node,
45 ctx.rpc_code,
46 ctx.default_rpc_plugins,
47 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020048 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020049 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020050 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030051
koder aka kdanilov39e449e2016-12-17 15:15:26 +020052 failed_testnodes = [] # type: List[NodeInfo]
53 failed_nodes = [] # type: List[NodeInfo]
54 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030055
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020056 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020057 if not ok:
58 node = cast(NodeInfo, node)
59 if 'testnode' in node.roles:
60 failed_testnodes.append(node)
61 else:
62 failed_nodes.append(node)
63 else:
64 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020065
koder aka kdanilov39e449e2016-12-17 15:15:26 +020066 if failed_nodes:
67 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020068 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030069
koder aka kdanilov39e449e2016-12-17 15:15:26 +020070 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020071 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030072 logger.error(msg)
73 raise utils.StopTestError(msg)
74
koder aka kdanilov39e449e2016-12-17 15:15:26 +020075 if not failed_nodes:
76 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030077
koder aka kdanilov39e449e2016-12-17 15:15:26 +020078 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020079 if ctx.config.get("download_rpc_logs", False):
80 for node in ctx.nodes:
81 if node.rpc_log_file is not None:
82 nid = node.info.node_id()
83 path = "rpc_logs/" + nid
84 node.conn.server.flush_logs()
85 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov3af3c332016-12-19 17:12:34 +020086 ctx.storage.store_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020087 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
88
89 with ctx.get_pool() as pool:
90 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +030091
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030092
koder aka kdanilov39e449e2016-12-17 15:15:26 +020093class CollectInfoStage(Stage):
94 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020095
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020096 priority = StepOrder.START_SENSORS - 2
koder aka kdanilov39e449e2016-12-17 15:15:26 +020097 config_block = 'collect_info'
98
99 def run(self, ctx: TestRun) -> None:
100 if not ctx.config.collect_info:
101 return
102
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200103 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200104
105 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200106 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200107 for node in ctx.nodes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200108 nid = node.info.node_id()
109 hw_info_path = "hw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200110 if hw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200111 futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200112
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200113 for (path, nid), future in futures.items():
114 try:
115 ctx.storage[path] = future.result()
116 except Exception:
117 logger.exception("During collecting hardware info from %s", nid)
118 raise utils.StopTestError()
119
120 futures.clear()
121 for node in ctx.nodes:
122 nid = node.info.node_id()
123 sw_info_path = "sw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200124 if sw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200125 futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200126
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200127 for (path, nid), future in futures.items():
128 try:
129 ctx.storage[path] = future.result()
130 except Exception:
131 logger.exception("During collecting software info from %s", nid)
132 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200133
134
135class ExplicitNodesStage(Stage):
136 """add explicit nodes"""
137
138 priority = StepOrder.DISCOVER
139 config_block = 'nodes'
140
141 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200142 if 'all_nodes' in ctx.storage:
143 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
144 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200145
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200146 for url, roles in ctx.config.get('nodes', {}).raw().items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200147 ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200148 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200149
150
151class SaveNodesStage(Stage):
152 """Save nodes list to file"""
153
154 priority = StepOrder.CONNECT
155
156 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200157 ctx.storage['all_nodes'] = list(ctx.nodes_info.values()) # type: ignore
158
159
160class SleepStage(Stage):
161 """Save nodes list to file"""
162
163 priority = StepOrder.TEST
164 config_block = 'sleep'
165
166 def run(self, ctx: TestRun) -> None:
167 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
168 time.sleep(ctx.config.sleep)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200169
170
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200171class PrepareNodes(Stage):
172 priority = StepOrder.START_SENSORS - 1
173
174 def __init__(self):
175 Stage.__init__(self)
176 self.nodeepscrub_updated = False
177 self.noscrub_updated = False
178
179 def run(self, ctx: TestRun) -> None:
180 ceph_sett = ctx.config.get('ceph_settings', "").split()
181 if ceph_sett:
182 for node in ctx.nodes:
183 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
184 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
185 if 'noscrub' in ceph_sett:
186 if 'noscrub' in state:
187 logger.debug("noscrub already set on cluster")
188 else:
189 logger.info("Applying noscrub settings to ceph cluster")
190 node.run("ceph osd set noscrub")
191 self.noscrub_updated = True
192
193 if 'nodeepscrub' in ceph_sett:
194 if 'nodeepscrub' in state:
195 logger.debug("noscrub already set on cluster")
196 else:
197 logger.info("Applying noscrub settings to ceph cluster")
198 node.run("ceph osd set noscrub")
199 self.nodeepscrub_updated = True
200 break
201
202 def cleanup(self, ctx: TestRun) -> None:
203 if self.nodeepscrub_updated or self.noscrub_updated:
204 for node in ctx.nodes:
205 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
206 if self.noscrub_updated:
207 logger.info("Reverting noscrub setting for ceph cluster")
208 node.run("ceph osd unset noscrub")
209 self.noscrub_updated = False
210
211 if self.nodeepscrub_updated:
212 logger.info("Reverting noscrub setting for ceph cluster")
213 node.run("ceph osd unset nodeepscrub")
214 self.nodeepscrub_updated = False
215
216
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200217class RunTestsStage(Stage):
218
219 priority = StepOrder.TEST
220 config_block = 'tests'
221
222 def run(self, ctx: TestRun) -> None:
223 for test_group in ctx.config.get('tests', []):
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200224 if not ctx.config.no_tests:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200225 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200226
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200227 if not test_nodes:
228 logger.error("No test nodes found")
229 return
koder aka kdanilovda45e882015-04-06 02:24:42 +0300230
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200231 for name, params in test_group.items():
232 vm_count = params.get('node_limit', None) # type: Optional[int]
gstepanov023c1e42015-04-08 15:50:19 +0300233
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200234 # select test nodes
235 if vm_count is None:
236 curr_test_nodes = test_nodes
237 else:
238 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200239
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200240 if not curr_test_nodes:
241 logger.error("No nodes found for test, skipping it.")
242 continue
koder aka kdanilov70227062016-11-26 23:23:21 +0200243
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200244 test_cls = TOOL_TYPE_MAPPER[name]
245 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
246 test_cfg = TestInputConfig(test_cls.__name__,
247 params=params,
248 run_uuid=ctx.config.run_uuid,
249 nodes=test_nodes,
250 storage=ctx.storage,
251 remote_dir=remote_dir)
koder aka kdanilov70227062016-11-26 23:23:21 +0200252
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200253 test_cls(test_cfg).run()
gstepanov023c1e42015-04-08 15:50:19 +0300254
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200255 @classmethod
256 def validate_config(cls, cfg: ConfigBlock) -> None:
257 pass