blob: 2cdf0e845cf2cadbc2f95f1b2da26561a8c905a3 [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +02002import json
koder aka kdanilove21d7472015-02-14 19:02:04 -08003import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02004from concurrent.futures import Future
5from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03006
koder aka kdanilov39e449e2016-12-17 15:15:26 +02007from . import utils, ssh_utils, hw_info
8from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +02009from .node import setup_rpc, connect
koder aka kdanilov7f59d562016-12-26 01:34:23 +020010from .node_interfaces import NodeInfo, IRPCNode
koder aka kdanilov39e449e2016-12-17 15:15:26 +020011from .stage import Stage, StepOrder
koder aka kdanilov7f59d562016-12-26 01:34:23 +020012from .sensors import collect_sensors_data
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030013from .suits.io.fio import IOPerfTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020014from .suits.itest import TestInputConfig
15from .suits.mysql import MysqlTest
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030016from .suits.omgbench import OmgTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020017from .suits.postgres import PgBenchTest
18from .test_run_class import TestRun
koder aka kdanilov7f59d562016-12-26 01:34:23 +020019from .utils import StopTestError
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030020
21
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030022TOOL_TYPE_MAPPER = {
23 "io": IOPerfTest,
24 "pgbench": PgBenchTest,
25 "mysql": MysqlTest,
Yulia Portnovab0c977c2015-12-11 19:23:28 +020026 "omg": OmgTest,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030027}
koder aka kdanilov63ad2062015-04-27 13:11:40 +030028
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030029
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030030logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030031
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080032
koder aka kdanilov39e449e2016-12-17 15:15:26 +020033class ConnectStage(Stage):
34 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080035
koder aka kdanilov39e449e2016-12-17 15:15:26 +020036 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030037
koder aka kdanilov39e449e2016-12-17 15:15:26 +020038 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020039 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020040 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030041
koder aka kdanilov39e449e2016-12-17 15:15:26 +020042 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
43 try:
44 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020045
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020046 return True, setup_rpc(ssh_node,
47 ctx.rpc_code,
48 ctx.default_rpc_plugins,
49 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020050 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020051 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020052 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030053
koder aka kdanilov39e449e2016-12-17 15:15:26 +020054 failed_testnodes = [] # type: List[NodeInfo]
55 failed_nodes = [] # type: List[NodeInfo]
56 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030057
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020058 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020059 if not ok:
60 node = cast(NodeInfo, node)
61 if 'testnode' in node.roles:
62 failed_testnodes.append(node)
63 else:
64 failed_nodes.append(node)
65 else:
66 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020067
koder aka kdanilov39e449e2016-12-17 15:15:26 +020068 if failed_nodes:
69 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020070 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030071
koder aka kdanilov39e449e2016-12-17 15:15:26 +020072 if failed_testnodes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +020073 msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030074 logger.error(msg)
75 raise utils.StopTestError(msg)
76
koder aka kdanilov39e449e2016-12-17 15:15:26 +020077 if not failed_nodes:
78 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030079
koder aka kdanilov39e449e2016-12-17 15:15:26 +020080 def cleanup(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020081 if ctx.config.get("download_rpc_logs", False):
82 for node in ctx.nodes:
83 if node.rpc_log_file is not None:
84 nid = node.info.node_id()
85 path = "rpc_logs/" + nid
86 node.conn.server.flush_logs()
87 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020088 if path in ctx.storage:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +020089 ctx.storage.append_raw(log, path)
koder aka kdanilov7f59d562016-12-26 01:34:23 +020090 else:
koder aka kdanilovffaf48d2016-12-27 02:25:29 +020091 ctx.storage.put_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020092 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
93
94 with ctx.get_pool() as pool:
95 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +030096
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030097
koder aka kdanilov39e449e2016-12-17 15:15:26 +020098class CollectInfoStage(Stage):
99 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200100
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200101 priority = StepOrder.START_SENSORS - 2
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200102 config_block = 'collect_info'
103
104 def run(self, ctx: TestRun) -> None:
105 if not ctx.config.collect_info:
106 return
107
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200108 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200109
110 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200111 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200112 for node in ctx.nodes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200113 nid = node.info.node_id()
114 hw_info_path = "hw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200115 if hw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200116 futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200117
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200118 for (path, nid), future in futures.items():
119 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200120 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200121 except Exception:
122 logger.exception("During collecting hardware info from %s", nid)
123 raise utils.StopTestError()
124
125 futures.clear()
126 for node in ctx.nodes:
127 nid = node.info.node_id()
128 sw_info_path = "sw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200129 if sw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200130 futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200131
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200132 for (path, nid), future in futures.items():
133 try:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200134 ctx.storage.put(future.result(), path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200135 except Exception:
136 logger.exception("During collecting software info from %s", nid)
137 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200138
139
140class ExplicitNodesStage(Stage):
141 """add explicit nodes"""
142
143 priority = StepOrder.DISCOVER
144 config_block = 'nodes'
145
146 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200147 if 'all_nodes' in ctx.storage:
148 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
149 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200150
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200151 for url, roles in ctx.config.get('nodes', {}).raw().items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200152 ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200153 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200154
155
156class SaveNodesStage(Stage):
157 """Save nodes list to file"""
158
159 priority = StepOrder.CONNECT
160
161 def run(self, ctx: TestRun) -> None:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200162 ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200163
164
165class SleepStage(Stage):
166 """Save nodes list to file"""
167
168 priority = StepOrder.TEST
169 config_block = 'sleep'
170
171 def run(self, ctx: TestRun) -> None:
172 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
173 time.sleep(ctx.config.sleep)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200174
175
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200176class PrepareNodes(Stage):
177 priority = StepOrder.START_SENSORS - 1
178
179 def __init__(self):
180 Stage.__init__(self)
181 self.nodeepscrub_updated = False
182 self.noscrub_updated = False
183
184 def run(self, ctx: TestRun) -> None:
185 ceph_sett = ctx.config.get('ceph_settings', "").split()
186 if ceph_sett:
187 for node in ctx.nodes:
188 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
189 state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
190 if 'noscrub' in ceph_sett:
191 if 'noscrub' in state:
192 logger.debug("noscrub already set on cluster")
193 else:
194 logger.info("Applying noscrub settings to ceph cluster")
195 node.run("ceph osd set noscrub")
196 self.noscrub_updated = True
197
198 if 'nodeepscrub' in ceph_sett:
199 if 'nodeepscrub' in state:
200 logger.debug("noscrub already set on cluster")
201 else:
202 logger.info("Applying noscrub settings to ceph cluster")
203 node.run("ceph osd set noscrub")
204 self.nodeepscrub_updated = True
205 break
206
207 def cleanup(self, ctx: TestRun) -> None:
208 if self.nodeepscrub_updated or self.noscrub_updated:
209 for node in ctx.nodes:
210 if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
211 if self.noscrub_updated:
212 logger.info("Reverting noscrub setting for ceph cluster")
213 node.run("ceph osd unset noscrub")
214 self.noscrub_updated = False
215
216 if self.nodeepscrub_updated:
217 logger.info("Reverting noscrub setting for ceph cluster")
218 node.run("ceph osd unset nodeepscrub")
219 self.nodeepscrub_updated = False
220
221
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200222class RunTestsStage(Stage):
223
224 priority = StepOrder.TEST
225 config_block = 'tests'
226
227 def run(self, ctx: TestRun) -> None:
228 for test_group in ctx.config.get('tests', []):
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200229 if not ctx.config.no_tests:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200230 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200231
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200232 if not test_nodes:
233 logger.error("No test nodes found")
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200234 raise StopTestError()
koder aka kdanilovda45e882015-04-06 02:24:42 +0300235
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200236 for name, params in test_group.items():
237 vm_count = params.get('node_limit', None) # type: Optional[int]
gstepanov023c1e42015-04-08 15:50:19 +0300238
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200239 # select test nodes
240 if vm_count is None:
241 curr_test_nodes = test_nodes
242 else:
243 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200244
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200245 if not curr_test_nodes:
246 logger.error("No nodes found for test, skipping it.")
247 continue
koder aka kdanilov70227062016-11-26 23:23:21 +0200248
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200249 test_cls = TOOL_TYPE_MAPPER[name]
250 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
251 test_cfg = TestInputConfig(test_cls.__name__,
252 params=params,
253 run_uuid=ctx.config.run_uuid,
254 nodes=test_nodes,
255 storage=ctx.storage,
256 remote_dir=remote_dir)
koder aka kdanilov70227062016-11-26 23:23:21 +0200257
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200258 test_cls(test_cfg,
259 on_idle=lambda: collect_sensors_data(ctx, False)).run()
gstepanov023c1e42015-04-08 15:50:19 +0300260
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200261 @classmethod
262 def validate_config(cls, cfg: ConfigBlock) -> None:
263 pass
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200264
265
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200266class LoadStoredNodesStage(Stage):
267 priority = StepOrder.DISCOVER
268
269 def run(self, ctx: TestRun) -> None:
270 if 'all_nodes' in ctx.storage:
271 if ctx.nodes_info:
272 logger.error("Internal error: Some nodes already stored in " +
273 "nodes_info before LoadStoredNodesStage stage")
274 raise StopTestError()
275 ctx.nodes_info = {node.node_id(): node
276 for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
277 logger.info("%s nodes loaded from database", len(ctx.nodes_info))