blob: 1a645b6977aaa15fce784ab53e1d583037f7f3f5 [file] [log] [blame]
koder aka kdanilove21d7472015-02-14 19:02:04 -08001import logging
koder aka kdanilov39e449e2016-12-17 15:15:26 +02002from concurrent.futures import Future
3from typing import List, Dict, Tuple, Optional, Union, cast
koder aka kdanilov88407ff2015-05-26 15:35:57 +03004
koder aka kdanilov39e449e2016-12-17 15:15:26 +02005from . import utils, ssh_utils, hw_info
6from .config import ConfigBlock
koder aka kdanilov73084622016-11-16 21:51:08 +02007from .node import setup_rpc, connect
koder aka kdanilov39e449e2016-12-17 15:15:26 +02008from .node_interfaces import NodeInfo, IRPCNode
9from .stage import Stage, StepOrder
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030010from .suits.io.fio import IOPerfTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020011from .suits.itest import TestInputConfig
12from .suits.mysql import MysqlTest
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030013from .suits.omgbench import OmgTest
koder aka kdanilov39e449e2016-12-17 15:15:26 +020014from .suits.postgres import PgBenchTest
15from .test_run_class import TestRun
koder aka kdanilovbc2c8982015-06-13 02:50:43 +030016
17
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030018TOOL_TYPE_MAPPER = {
19 "io": IOPerfTest,
20 "pgbench": PgBenchTest,
21 "mysql": MysqlTest,
Yulia Portnovab0c977c2015-12-11 19:23:28 +020022 "omg": OmgTest,
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030023}
koder aka kdanilov63ad2062015-04-27 13:11:40 +030024
koder aka kdanilov57ce4db2015-04-25 21:25:51 +030025
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030026logger = logging.getLogger("wally")
koder aka kdanilovcee43342015-04-14 22:52:53 +030027
koder aka kdanilov7acd6bd2015-02-12 14:28:30 -080028
koder aka kdanilov39e449e2016-12-17 15:15:26 +020029class ConnectStage(Stage):
30 """Connect to nodes stage"""
koder aka kdanilove21d7472015-02-14 19:02:04 -080031
koder aka kdanilov39e449e2016-12-17 15:15:26 +020032 priority = StepOrder.CONNECT
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030033
koder aka kdanilov39e449e2016-12-17 15:15:26 +020034 def run(self, ctx: TestRun) -> None:
koder aka kdanilov73084622016-11-16 21:51:08 +020035 with ctx.get_pool() as pool:
koder aka kdanilov39e449e2016-12-17 15:15:26 +020036 logger.info("Connecting to %s nodes", len(ctx.nodes_info))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030037
koder aka kdanilov39e449e2016-12-17 15:15:26 +020038 def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
39 try:
40 ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
41 # TODO(koder): need to pass all required rpc bytes to this call
42 return True, setup_rpc(ssh_node, b"")
43 except Exception as exc:
44 logger.error("During connect to {}: {!s}".format(node, exc))
45 return False, node_info
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030046
koder aka kdanilov39e449e2016-12-17 15:15:26 +020047 failed_testnodes = [] # type: List[NodeInfo]
48 failed_nodes = [] # type: List[NodeInfo]
49 ctx.nodes = []
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030050
koder aka kdanilov39e449e2016-12-17 15:15:26 +020051 for ok, node in pool.map(connect_ext, ctx.nodes_info):
52 if not ok:
53 node = cast(NodeInfo, node)
54 if 'testnode' in node.roles:
55 failed_testnodes.append(node)
56 else:
57 failed_nodes.append(node)
58 else:
59 ctx.nodes.append(cast(IRPCNode, node))
koder aka kdanilov22d134e2016-11-08 11:33:19 +020060
koder aka kdanilov39e449e2016-12-17 15:15:26 +020061 if failed_nodes:
62 msg = "Node(s) {} would be excluded - can't connect"
63 logger.warning(msg.format(",".join(map(str, failed_nodes))))
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030064
koder aka kdanilov39e449e2016-12-17 15:15:26 +020065 if failed_testnodes:
66 msg = "Can't connect to testnode(s) " + \
67 ",".join(map(str, failed_testnodes))
koder aka kdanilovc368eb62015-04-28 18:22:01 +030068 logger.error(msg)
69 raise utils.StopTestError(msg)
70
koder aka kdanilov39e449e2016-12-17 15:15:26 +020071 if not failed_nodes:
72 logger.info("All nodes connected successfully")
koder aka kdanilovcee43342015-04-14 22:52:53 +030073
koder aka kdanilov39e449e2016-12-17 15:15:26 +020074 def cleanup(self, ctx: TestRun) -> None:
75 # TODO(koder): what next line was for?
76 # ssh_utils.close_all_sessions()
koder aka kdanilov73084622016-11-16 21:51:08 +020077
koder aka kdanilov39e449e2016-12-17 15:15:26 +020078 for node in ctx.nodes:
79 node.disconnect()
koder aka kdanilovcee43342015-04-14 22:52:53 +030080
koder aka kdanilov0fdaaee2015-06-30 11:10:48 +030081
koder aka kdanilov39e449e2016-12-17 15:15:26 +020082class CollectInfoStage(Stage):
83 """Collect node info"""
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020084
koder aka kdanilov39e449e2016-12-17 15:15:26 +020085 priority = StepOrder.START_SENSORS - 1
86 config_block = 'collect_info'
87
88 def run(self, ctx: TestRun) -> None:
89 if not ctx.config.collect_info:
90 return
91
92 futures = {} # type: Dict[str, Future]
93
94 with ctx.get_pool() as pool:
95 for node in ctx.nodes:
96 hw_info_path = "hw_info/{}".format(node.info.node_id())
97 if hw_info_path not in ctx.storage:
98 futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
99
100 sw_info_path = "sw_info/{}".format(node.info.node_id())
101 if sw_info_path not in ctx.storage:
102 futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
103
104 for path, future in futures.items():
105 ctx.storage[path] = future.result()
106
107
108class ExplicitNodesStage(Stage):
109 """add explicit nodes"""
110
111 priority = StepOrder.DISCOVER
112 config_block = 'nodes'
113
114 def run(self, ctx: TestRun) -> None:
115 explicit_nodes = []
116 for url, roles in ctx.config.get('explicit_nodes', {}).items():
117 creds = ssh_utils.parse_ssh_uri(url)
118 roles = set(roles.split(","))
119 explicit_nodes.append(NodeInfo(creds, roles))
120
121 ctx.nodes_info.extend(explicit_nodes)
122 ctx.storage['explicit_nodes'] = explicit_nodes # type: ignore
123
124
125class SaveNodesStage(Stage):
126 """Save nodes list to file"""
127
128 priority = StepOrder.CONNECT
129
130 def run(self, ctx: TestRun) -> None:
131 ctx.storage['all_nodes'] = ctx.nodes_info # type: ignore
132
133
134class RunTestsStage(Stage):
135
136 priority = StepOrder.TEST
137 config_block = 'tests'
138
139 def run(self, ctx: TestRun) -> None:
140 for test_group in ctx.config.get('tests', []):
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200141 if not ctx.config.no_tests:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200142 test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200143
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200144 if not test_nodes:
145 logger.error("No test nodes found")
146 return
koder aka kdanilovda45e882015-04-06 02:24:42 +0300147
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200148 for name, params in test_group.items():
149 vm_count = params.get('node_limit', None) # type: Optional[int]
gstepanov023c1e42015-04-08 15:50:19 +0300150
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200151 # select test nodes
152 if vm_count is None:
153 curr_test_nodes = test_nodes
154 else:
155 curr_test_nodes = test_nodes[:vm_count]
koder aka kdanilov70227062016-11-26 23:23:21 +0200156
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200157 if not curr_test_nodes:
158 logger.error("No nodes found for test, skipping it.")
159 continue
koder aka kdanilov70227062016-11-26 23:23:21 +0200160
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200161 test_cls = TOOL_TYPE_MAPPER[name]
162 remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
163 test_cfg = TestInputConfig(test_cls.__name__,
164 params=params,
165 run_uuid=ctx.config.run_uuid,
166 nodes=test_nodes,
167 storage=ctx.storage,
168 remote_dir=remote_dir)
koder aka kdanilov70227062016-11-26 23:23:21 +0200169
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200170 test_cls(test_cfg).run()
gstepanov023c1e42015-04-08 15:50:19 +0300171
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200172 @classmethod
173 def validate_config(cls, cfg: ConfigBlock) -> None:
174 pass