blob: 891df5a042682ca67e6cfc3e98093d39d2330c00 [file] [log] [blame]
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02001import time
koder aka kdanilove21d7472015-02-14 19:02:04 -08002import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02003from concurrent.futures import Future
4from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03005
koder aka kdanilov39e449e2016-12-17 15:15:26 +02006from . import utils, ssh_utils, hw_info
7from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +02008from .node import setup_rpc, connect
koder aka kdanilov39e449e2016-12-17 15:15:26 +02009from .node_interfaces import NodeInfo, IRPCNode
10from .stage import Stage, StepOrder
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030011from .suits.io.fio import IOPerfTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020012from .suits.itest import TestInputConfig
13from .suits.mysql import MysqlTest
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030014from .suits.omgbench import OmgTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020015from .suits.postgres import PgBenchTest
16from .test_run_class import TestRun
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030017
18
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030019TOOL_TYPE_MAPPER = {
20 "io": IOPerfTest,
21 "pgbench": PgBenchTest,
22 "mysql": MysqlTest,
Yulia Portnovab0c977c2015-12-11 19:23:28 +020023 "omg": OmgTest,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030024}
koder aka kdanilov63ad2062015-04-27 13:11:40 +030025
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030026
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030027logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030028
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080029
koder aka kdanilov39e449e2016-12-17 15:15:26 +020030class ConnectStage(Stage):
31 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080032
koder aka kdanilov39e449e2016-12-17 15:15:26 +020033 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030034
koder aka kdanilov39e449e2016-12-17 15:15:26 +020035 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020036 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020037 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030038
koder aka kdanilov39e449e2016-12-17 15:15:26 +020039 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
40 try:
41 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020042 return True, setup_rpc(ssh_node,
43 ctx.rpc_code,
44 ctx.default_rpc_plugins,
45 log_level=ctx.config.rpc_log_level)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020046 except Exception as exc:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020047 logger.exception("During connect to %s: %s", node_info, exc)
koder aka kdanilov39e449e2016-12-17 15:15:26 +020048 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030049
koder aka kdanilov39e449e2016-12-17 15:15:26 +020050 failed_testnodes = [] # type: List[NodeInfo]
51 failed_nodes = [] # type: List[NodeInfo]
52 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030053
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020054 for ok, node in pool.map(connect_ext, ctx.nodes_info.values()):
koder aka kdanilov39e449e2016-12-17 15:15:26 +020055 if not ok:
56 node = cast(NodeInfo, node)
57 if 'testnode' in node.roles:
58 failed_testnodes.append(node)
59 else:
60 failed_nodes.append(node)
61 else:
62 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020063
koder aka kdanilov39e449e2016-12-17 15:15:26 +020064 if failed_nodes:
65 msg = "Node(s) {} would be excluded - can't connect"
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020066 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030067
koder aka kdanilov39e449e2016-12-17 15:15:26 +020068 if failed_testnodes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020069 msg = "Can't connect to testnode(s) " + ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030070 logger.error(msg)
71 raise utils.StopTestError(msg)
72
koder aka kdanilov39e449e2016-12-17 15:15:26 +020073 if not failed_nodes:
74 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030075
koder aka kdanilov39e449e2016-12-17 15:15:26 +020076 def cleanup(self, ctx: TestRun) -> None:
77 # TODO(koder): what next line was for?
78 # ssh_utils.close_all_sessions()
koder aka kdanilov73084622016-11-16 21:51:08 +020079
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020080 if ctx.config.get("download_rpc_logs", False):
81 for node in ctx.nodes:
82 if node.rpc_log_file is not None:
83 nid = node.info.node_id()
84 path = "rpc_logs/" + nid
85 node.conn.server.flush_logs()
86 log = node.get_file_content(node.rpc_log_file)
koder aka kdanilov3af3c332016-12-19 17:12:34 +020087 ctx.storage.store_raw(log, path)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020088 logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
89
90 with ctx.get_pool() as pool:
91 list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
koder aka kdanilovcee43342015-04-14 22:52:53 +030092
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030093
koder aka kdanilov39e449e2016-12-17 15:15:26 +020094class CollectInfoStage(Stage):
95 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020096
koder aka kdanilov39e449e2016-12-17 15:15:26 +020097 priority = StepOrder.START_SENSORS - 1
98 config_block = 'collect_info'
99
100 def run(self, ctx: TestRun) -> None:
101 if not ctx.config.collect_info:
102 return
103
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200104 futures = {} # type: Dict[Tuple[str, str], Future]
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200105
106 with ctx.get_pool() as pool:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200107 # can't make next RPC request until finish with previous
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200108 for node in ctx.nodes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200109 nid = node.info.node_id()
110 hw_info_path = "hw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200111 if hw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200112 futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200113
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200114 for (path, nid), future in futures.items():
115 try:
116 ctx.storage[path] = future.result()
117 except Exception:
118 logger.exception("During collecting hardware info from %s", nid)
119 raise utils.StopTestError()
120
121 futures.clear()
122 for node in ctx.nodes:
123 nid = node.info.node_id()
124 sw_info_path = "sw_info/{}".format(nid)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200125 if sw_info_path not in ctx.storage:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200126 futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200127
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200128 for (path, nid), future in futures.items():
129 try:
130 ctx.storage[path] = future.result()
131 except Exception:
132 logger.exception("During collecting software info from %s", nid)
133 raise utils.StopTestError()
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200134
135
136class ExplicitNodesStage(Stage):
137 """add explicit nodes"""
138
139 priority = StepOrder.DISCOVER
140 config_block = 'nodes'
141
142 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200143 if 'all_nodes' in ctx.storage:
144 logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
145 return
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200146
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200147 for url, roles in ctx.config.get('nodes', {}).raw().items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200148 ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200149 logger.debug("Add node %s with roles %s", url, roles)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200150
151
152class SaveNodesStage(Stage):
153 """Save nodes list to file"""
154
155 priority = StepOrder.CONNECT
156
157 def run(self, ctx: TestRun) -> None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200158 ctx.storage['all_nodes'] = list(ctx.nodes_info.values()) # type: ignore
159
160
161class SleepStage(Stage):
162 """Save nodes list to file"""
163
164 priority = StepOrder.TEST
165 config_block = 'sleep'
166
167 def run(self, ctx: TestRun) -> None:
168 logger.debug("Will sleep for %r seconds", ctx.config.sleep)
169 time.sleep(ctx.config.sleep)
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200170
171
172class RunTestsStage(Stage):
173
174 priority = StepOrder.TEST
175 config_block = 'tests'
176
177 def run(self, ctx: TestRun) -> None:
178 for test_group in ctx.config.get('tests', []):
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200179 if not ctx.config.no_tests:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200180 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200181
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200182 if not test_nodes:
183 logger.error("No test nodes found")
184 return
koder aka kdanilovda45e882015-04-06 02:24:42 +0300185
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200186 for name, params in test_group.items():
187 vm_count = params.get('node_limit', None) # type: Optional[int]
gstepanov023c1e42015-04-08 15:50:19 +0300188
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200189 # select test nodes
190 if vm_count is None:
191 curr_test_nodes = test_nodes
192 else:
193 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200194
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200195 if not curr_test_nodes:
196 logger.error("No nodes found for test, skipping it.")
197 continue
koder aka kdanilov70227062016-11-26 23:23:21 +0200198
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200199 test_cls = TOOL_TYPE_MAPPER[name]
200 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
201 test_cfg = TestInputConfig(test_cls.__name__,
202 params=params,
203 run_uuid=ctx.config.run_uuid,
204 nodes=test_nodes,
205 storage=ctx.storage,
206 remote_dir=remote_dir)
koder aka kdanilov70227062016-11-26 23:23:21 +0200207
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200208 test_cls(test_cfg).run()
gstepanov023c1e42015-04-08 15:50:19 +0300209
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200210 @classmethod
211 def validate_config(cls, cfg: ConfigBlock) -> None:
212 pass