blob: caca30ed9c98bf60b1d8bd8783fde924cc71b751 [file] [log] [blame]
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +02001from io import StringIO
2import logging
3import select
4import utils
5import paramiko
6import time
7import os
8
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +01009from netaddr import IPNetwork, IPAddress
10
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020011logger = logging.getLogger(__name__)
12
13# Suppress paramiko logging
14logging.getLogger("paramiko").setLevel(logging.WARNING)
15
16
17class SSHTransport(object):
18 def __init__(self, address, username, password=None,
19 private_key=None, look_for_keys=False, *args, **kwargs):
20
21 self.address = address
22 self.username = username
23 self.password = password
24 if private_key is not None:
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +010025 if os.path.isfile(private_key):
26 with open(private_key, 'r') as key_file:
27 private_key_content = key_file.read()
28 private_key = private_key_content
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020029 self.private_key = paramiko.RSAKey.from_private_key(
30 StringIO(private_key))
31 else:
32 self.private_key = None
33
34 self.look_for_keys = look_for_keys
35 self.buf_size = 1024
36 self.channel_timeout = 10.0
37
38 def _get_ssh_connection(self):
39 ssh = paramiko.SSHClient()
40 ssh.set_missing_host_key_policy(
41 paramiko.AutoAddPolicy())
42 ssh.connect(self.address, username=self.username,
43 password=self.password, pkey=self.private_key,
44 timeout=self.channel_timeout)
45 logger.debug("Successfully connected to: {0}".format(self.address))
46 return ssh
47
48 def _get_sftp_connection(self):
49 transport = paramiko.Transport((self.address, 22))
50 transport.connect(username=self.username,
51 password=self.password,
52 pkey=self.private_key)
53
54 return paramiko.SFTPClient.from_transport(transport)
55
56 def exec_sync(self, cmd):
57 logger.debug("Executing {0} on host {1}".format(cmd, self.address))
58 ssh = self._get_ssh_connection()
59 transport = ssh.get_transport()
60 channel = transport.open_session()
61 channel.fileno()
62 channel.exec_command(cmd)
63 channel.shutdown_write()
64 out_data = []
65 err_data = []
66 poll = select.poll()
67 poll.register(channel, select.POLLIN)
68
69 while True:
70 ready = poll.poll(self.channel_timeout)
71 if not any(ready):
72 continue
73 if not ready[0]:
74 continue
75 out_chunk = err_chunk = None
76 if channel.recv_ready():
77 out_chunk = channel.recv(self.buf_size)
78 out_data += out_chunk,
79 if channel.recv_stderr_ready():
80 err_chunk = channel.recv_stderr(self.buf_size)
81 err_data += err_chunk,
82 if channel.closed and not err_chunk and not out_chunk:
83 break
84 exit_status = channel.recv_exit_status()
85 logger.debug("Command {0} executed with status: {1}"
86 .format(cmd, exit_status))
87 return (exit_status, b" ".join(out_data).strip(),
88 b" ".join(err_data).strip())
89
90 def exec_command(self, cmd):
91 exit_status, stdout, stderr = self.exec_sync(cmd)
92 return stdout
93
94 def check_call(self, command, error_info=None, expected=None,
95 raise_on_err=True):
96 """Execute command and check for return code
97 :type command: str
98 :type error_info: str
99 :type expected: list
100 :type raise_on_err: bool
101 :rtype: ExecResult
102 :raises: DevopsCalledProcessError
103 """
104 if expected is None:
105 expected = [0]
106 ret = self.exec_sync(command)
107 exit_code, stdout_str, stderr_str = ret
108 if exit_code not in expected:
109 message = (
110 "{append}Command '{cmd}' returned exit code {code} while "
111 "expected {expected}\n"
112 "\tSTDOUT:\n"
113 "{stdout}"
114 "\n\tSTDERR:\n"
115 "{stderr}".format(
116 append=error_info + '\n' if error_info else '',
117 cmd=command,
118 code=exit_code,
119 expected=expected,
120 stdout=stdout_str,
121 stderr=stderr_str
122 ))
123 logger.error(message)
124 if raise_on_err:
125 exit()
126 return ret
127
128 def put_file(self, source_path, destination_path):
129 sftp = self._get_sftp_connection()
130 sftp.put(source_path, destination_path)
131 sftp.close()
132
133 def put_iperf3_deb_packages_at_vms(self, source_directory,
134 destination_directory):
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300135 required_packages = ['iperf', 'iperf3', 'libiperf0', 'libsctp1']
Ievgeniia Zadorozhnae1426742022-06-16 17:27:24 +0300136 iperf_deb_files = [pack for pack in os.listdir(source_directory) if
137 any(req in pack for req in required_packages) if
138 pack.endswith('.deb')]
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200139 if not iperf_deb_files:
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100140 raise utils.exceptions.NoPackageInstalled(
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300141 "iperf3 or iperf *.deb packages are not found locally at path"
142 " {}. Please recheck 'iperf_deb_package_dir_path' variable in "
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200143 "global_config.yaml and check *.deb packages are manually "
144 "copied there.".format(source_directory))
145 for f in iperf_deb_files:
146 source_abs_path = "{}/{}".format(source_directory, f)
147 dest_abs_path = "{}/{}".format(destination_directory, f)
148 self.put_file(source_abs_path, dest_abs_path)
149
150 def get_file(self, source_path, destination_path):
151 sftp = self._get_sftp_connection()
152 sftp.get(source_path, destination_path)
153 sftp.close()
154
155 def _is_timed_out(self, start_time, timeout):
156 return (time.time() - timeout) > start_time
157
158 def check_vm_is_reachable_ssh(self, floating_ip, timeout=500, sleep=5):
159 bsleep = sleep
160 ssh = paramiko.SSHClient()
161 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
162 _start_time = time.time()
163 attempts = 0
164 while True:
165 try:
166 ssh.connect(floating_ip, username=self.username,
167 password=self.password, pkey=self.private_key,
168 timeout=self.channel_timeout)
169 logger.info("VM with FIP {} is reachable via SSH. Success!"
170 "".format(floating_ip))
171 return True
172 except Exception as e:
173 ssh.close()
174 if self._is_timed_out(_start_time, timeout):
175 logger.info("VM with FIP {} is not reachable via SSH. "
176 "See details: {}".format(floating_ip, e))
177 raise TimeoutError(
178 "\nFailed to establish authenticated ssh connection "
179 "to {} after {} attempts during {} seconds.\n{}"
180 "".format(floating_ip, attempts, timeout, e))
181 attempts += 1
182 logger.info("Failed to establish authenticated ssh connection "
183 "to {}. Number attempts: {}. Retry after {} "
184 "seconds.".format(floating_ip, attempts, bsleep))
185 time.sleep(bsleep)
186
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100187 @staticmethod
188 def get_mtu_from_vm(floating_ip, user='ubuntu', password='password',
Ievgeniia Zadorozhna0facf3c2022-06-16 16:19:09 +0300189 private_key=None):
190 transport = SSHTransport(floating_ip, user, password, private_key)
191 iface = (transport.exec_command(
192 'ls /sys/class/net | grep -v lo | head -n 1')).decode("utf-8")
193 mtu = transport.exec_command('cat /sys/class/net/{}/mtu'.format(iface))
194 return mtu.decode("utf-8")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200195
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100196 @staticmethod
197 def get_node_ip_addresses_from_cidr(ip, cidr, user='mcc-user',
198 private_key=None):
199 transport = SSHTransport(
200 ip, username=user, private_key=private_key)
201 command = "ip -4 addr show scope global"
202 output = transport.exec_command(command)
203 try:
204 all_ip_addresses = [line.split()[1] for line in
205 output.decode().splitlines()
206 if "/" in line.split()[1]]
207 for address in all_ip_addresses:
208 ip_addr = address.split('/')[0]
209 if IPAddress(ip_addr) in IPNetwork(cidr):
210 return ip_addr
211 except Exception as e:
212 raise utils.exceptions.InvalidConfigException(
213 f"Could not find the IP at the interface {cidr} at the node "
214 f"with K8S Private IP {ip}. Please check the configuration. "
215 f"\nException: {e}")
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300216
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100217
218class IperfAtVM(object):
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200219
220 def __init__(self, fip, user='ubuntu', password='password',
221 private_key=None):
222
223 transport = SSHTransport(fip, user, password, private_key)
224 config = utils.get_configuration()
225
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300226 # Install iperf, iperf3 using apt or downloaded deb packages
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200227 internet_at_vms = utils.get_configuration().get("internet_at_vms")
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300228 path_to_iperf_deb = (config.get('iperf_deb_package_dir_path') or
229 "/opt/packages/")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200230 if internet_at_vms.lower() == 'false':
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300231 logger.info("Copying offline iperf3 deb packages, installing...")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200232 home_ubuntu = "/home/ubuntu/"
233 transport.put_iperf3_deb_packages_at_vms(path_to_iperf_deb,
234 home_ubuntu)
Ievgeniia Zadorozhnaf22827b2022-07-20 13:30:32 +0300235 exit_status, stdout, stderr = transport.exec_sync(
236 'sudo dpkg -i {}*.deb'.format(home_ubuntu))
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200237 else:
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300238 logger.info("Installing iperf, iperf3 using apt")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200239 preparation_cmd = config.get('iperf_prep_string') or ['']
240 transport.exec_command(preparation_cmd)
Ievgeniia Zadorozhnaf22827b2022-07-20 13:30:32 +0300241 exit_status, stdout, stderr = transport.exec_sync(
242 'sudo apt-get update && sudo apt-get install -y iperf3 iperf')
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200243
244 # Log whether iperf is installed with version
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300245 check = transport.exec_command('dpkg -l | grep ii | grep iperf3')
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300246 logger.info(check.decode('utf-8'))
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300247 if not check:
248 if internet_at_vms.lower() == 'true':
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100249 info = "Please check the Internet access at VM"
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300250 else:
251 info = "Could not put offline iperf packages from {} to the " \
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100252 "VM".format(path_to_iperf_deb)
253 raise utils.exceptions.NoPackageInstalled(
254 "iperf3 is not installed at VM with FIP {}. {}.\nStdout, "
255 "stderr at VM:\n{}\n{}".format(fip, info, stdout, stderr))
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200256 # Staring iperf server
257 transport.exec_command('nohup iperf3 -s > file 2>&1 &')
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300258 transport.exec_command('nohup iperf -s > file 2>&1 &')
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100259
260
261class IperfAtNode(object):
262
263 def __init__(self, ip, iperf_test_ip, user='mcc-user', private_key=None):
264 transport = SSHTransport(ip, user, private_key=private_key)
265 # TODO: to avoid looping, both packages can be installed by one
266 # 'install -y iperf iperf3' command. 'which' command can also be
267 # executed for multiple packages at once.
268 packages = ["iperf", "iperf3"]
269 for p in packages:
270 check_path = transport.exec_command(f'which {p}')
271 if not check_path:
272 self.install_iperf = True
273 # Install iperf/iperf3 at MOSK nodes
274 logger.info(f"Installing {p} at MOSK nodes...")
275 _, stdout, stderr = transport.exec_sync(
276 f"sudo apt update && sudo apt install -y {p}")
277 else:
278 self.install_iperf = False
279 check_path = transport.exec_command(f'which {p}')
280 # Log whether iperf is installed
281 logger.info(f"{p} package path: {check_path.decode('utf-8')}")
282 if not check_path:
283 raise utils.exceptions.NoPackageInstalled(
284 f"{p} is not installed at the MOSK node with IP {ip}.\n"
285 f"Stdout, stderr at VM:\n{stdout}\n{stderr}")
286 # Staring iperf/iperf3 server
287 transport.exec_command(
288 f"nohup {p} -s -B {iperf_test_ip} > file 2>&1 &")
289
290 @staticmethod
291 def remove_iperf_packages(ip, user='mcc-user', private_key=None):
292 transport = SSHTransport(ip, user, private_key=private_key)
293 logger.info(f"Removing iperf,iperf3 packages from the node {ip}...")
294 transport.exec_command("sudo apt remove iperf iperf3 -y")