blob: cb0a25880812265a83222cbde736daaa69343a1b [file] [log] [blame]
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +02001from io import StringIO
2import logging
3import select
4import utils
5import paramiko
6import time
7import os
8
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +01009from netaddr import IPNetwork, IPAddress
10
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020011logger = logging.getLogger(__name__)
12
13# Suppress paramiko logging
14logging.getLogger("paramiko").setLevel(logging.WARNING)
15
16
Ievgeniia Zadorozhnaa8e8b572025-03-28 14:29:45 +010017def install_iperf_at_vm(vm_info):
18 try:
19 IperfAtVM(vm_info['fip'], private_key=vm_info['private_key'])
20 except Exception as e:
21 print(f"Error on VM {vm_info['fip']}: {e}")
22 return None
23
24
25def get_mtu_at_vm(vm_info):
Ievgeniia Zadorozhna96a715b2024-03-02 00:09:58 +010026 transport1 = SSHTransport(vm_info['fip'], 'ubuntu', password='dd',
27 private_key=vm_info['private_key'])
28 try:
Ievgeniia Zadorozhna96a715b2024-03-02 00:09:58 +010029 logger.info("Getting MTU values from VMs...")
30 mtu = transport1.get_mtu_from_vm(
31 vm_info['fip'], private_key=vm_info['private_key'])
32 logger.info(f"MTU at VM {vm_info['fip']} is {mtu}")
33 return mtu
34 except Exception as e:
Ievgeniia Zadorozhnaa8e8b572025-03-28 14:29:45 +010035 print(f"Error while getting MTU at VM {vm_info['fip']}: {e}")
Ievgeniia Zadorozhna96a715b2024-03-02 00:09:58 +010036 return None
37
38
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020039class SSHTransport(object):
40 def __init__(self, address, username, password=None,
41 private_key=None, look_for_keys=False, *args, **kwargs):
42
43 self.address = address
44 self.username = username
45 self.password = password
46 if private_key is not None:
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +010047 if os.path.isfile(private_key):
48 with open(private_key, 'r') as key_file:
49 private_key_content = key_file.read()
50 private_key = private_key_content
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020051 self.private_key = paramiko.RSAKey.from_private_key(
52 StringIO(private_key))
53 else:
54 self.private_key = None
55
56 self.look_for_keys = look_for_keys
57 self.buf_size = 1024
58 self.channel_timeout = 10.0
59
60 def _get_ssh_connection(self):
61 ssh = paramiko.SSHClient()
62 ssh.set_missing_host_key_policy(
63 paramiko.AutoAddPolicy())
64 ssh.connect(self.address, username=self.username,
65 password=self.password, pkey=self.private_key,
66 timeout=self.channel_timeout)
67 logger.debug("Successfully connected to: {0}".format(self.address))
68 return ssh
69
70 def _get_sftp_connection(self):
71 transport = paramiko.Transport((self.address, 22))
72 transport.connect(username=self.username,
73 password=self.password,
74 pkey=self.private_key)
75
76 return paramiko.SFTPClient.from_transport(transport)
77
78 def exec_sync(self, cmd):
79 logger.debug("Executing {0} on host {1}".format(cmd, self.address))
80 ssh = self._get_ssh_connection()
81 transport = ssh.get_transport()
82 channel = transport.open_session()
83 channel.fileno()
84 channel.exec_command(cmd)
85 channel.shutdown_write()
86 out_data = []
87 err_data = []
88 poll = select.poll()
89 poll.register(channel, select.POLLIN)
90
91 while True:
92 ready = poll.poll(self.channel_timeout)
93 if not any(ready):
94 continue
95 if not ready[0]:
96 continue
97 out_chunk = err_chunk = None
98 if channel.recv_ready():
99 out_chunk = channel.recv(self.buf_size)
100 out_data += out_chunk,
101 if channel.recv_stderr_ready():
102 err_chunk = channel.recv_stderr(self.buf_size)
103 err_data += err_chunk,
104 if channel.closed and not err_chunk and not out_chunk:
105 break
106 exit_status = channel.recv_exit_status()
107 logger.debug("Command {0} executed with status: {1}"
108 .format(cmd, exit_status))
109 return (exit_status, b" ".join(out_data).strip(),
110 b" ".join(err_data).strip())
111
112 def exec_command(self, cmd):
113 exit_status, stdout, stderr = self.exec_sync(cmd)
114 return stdout
115
116 def check_call(self, command, error_info=None, expected=None,
117 raise_on_err=True):
118 """Execute command and check for return code
119 :type command: str
120 :type error_info: str
121 :type expected: list
122 :type raise_on_err: bool
123 :rtype: ExecResult
124 :raises: DevopsCalledProcessError
125 """
126 if expected is None:
127 expected = [0]
128 ret = self.exec_sync(command)
129 exit_code, stdout_str, stderr_str = ret
130 if exit_code not in expected:
131 message = (
132 "{append}Command '{cmd}' returned exit code {code} while "
133 "expected {expected}\n"
134 "\tSTDOUT:\n"
135 "{stdout}"
136 "\n\tSTDERR:\n"
137 "{stderr}".format(
138 append=error_info + '\n' if error_info else '',
139 cmd=command,
140 code=exit_code,
141 expected=expected,
142 stdout=stdout_str,
143 stderr=stderr_str
144 ))
145 logger.error(message)
146 if raise_on_err:
147 exit()
148 return ret
149
150 def put_file(self, source_path, destination_path):
151 sftp = self._get_sftp_connection()
152 sftp.put(source_path, destination_path)
153 sftp.close()
154
155 def put_iperf3_deb_packages_at_vms(self, source_directory,
156 destination_directory):
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300157 required_packages = ['iperf', 'iperf3', 'libiperf0', 'libsctp1']
Ievgeniia Zadorozhnae1426742022-06-16 17:27:24 +0300158 iperf_deb_files = [pack for pack in os.listdir(source_directory) if
159 any(req in pack for req in required_packages) if
160 pack.endswith('.deb')]
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200161 if not iperf_deb_files:
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100162 raise utils.exceptions.NoPackageInstalled(
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300163 "iperf3 or iperf *.deb packages are not found locally at path"
164 " {}. Please recheck 'iperf_deb_package_dir_path' variable in "
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200165 "global_config.yaml and check *.deb packages are manually "
166 "copied there.".format(source_directory))
167 for f in iperf_deb_files:
168 source_abs_path = "{}/{}".format(source_directory, f)
169 dest_abs_path = "{}/{}".format(destination_directory, f)
170 self.put_file(source_abs_path, dest_abs_path)
171
172 def get_file(self, source_path, destination_path):
173 sftp = self._get_sftp_connection()
174 sftp.get(source_path, destination_path)
175 sftp.close()
176
177 def _is_timed_out(self, start_time, timeout):
178 return (time.time() - timeout) > start_time
179
180 def check_vm_is_reachable_ssh(self, floating_ip, timeout=500, sleep=5):
181 bsleep = sleep
182 ssh = paramiko.SSHClient()
183 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
184 _start_time = time.time()
185 attempts = 0
186 while True:
187 try:
188 ssh.connect(floating_ip, username=self.username,
189 password=self.password, pkey=self.private_key,
190 timeout=self.channel_timeout)
191 logger.info("VM with FIP {} is reachable via SSH. Success!"
192 "".format(floating_ip))
193 return True
194 except Exception as e:
195 ssh.close()
196 if self._is_timed_out(_start_time, timeout):
197 logger.info("VM with FIP {} is not reachable via SSH. "
198 "See details: {}".format(floating_ip, e))
199 raise TimeoutError(
200 "\nFailed to establish authenticated ssh connection "
201 "to {} after {} attempts during {} seconds.\n{}"
202 "".format(floating_ip, attempts, timeout, e))
203 attempts += 1
204 logger.info("Failed to establish authenticated ssh connection "
205 "to {}. Number attempts: {}. Retry after {} "
206 "seconds.".format(floating_ip, attempts, bsleep))
207 time.sleep(bsleep)
208
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100209 @staticmethod
210 def get_mtu_from_vm(floating_ip, user='ubuntu', password='password',
Ievgeniia Zadorozhna0facf3c2022-06-16 16:19:09 +0300211 private_key=None):
212 transport = SSHTransport(floating_ip, user, password, private_key)
213 iface = (transport.exec_command(
214 'ls /sys/class/net | grep -v lo | head -n 1')).decode("utf-8")
215 mtu = transport.exec_command('cat /sys/class/net/{}/mtu'.format(iface))
216 return mtu.decode("utf-8")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200217
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100218 @staticmethod
219 def get_node_ip_addresses_from_cidr(ip, cidr, user='mcc-user',
220 private_key=None):
221 transport = SSHTransport(
222 ip, username=user, private_key=private_key)
Ievgeniia Zadorozhna12eb2e42025-03-21 17:28:38 +0100223 command = "ip a"
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100224 output = transport.exec_command(command)
225 try:
226 all_ip_addresses = [line.split()[1] for line in
227 output.decode().splitlines()
228 if "/" in line.split()[1]]
229 for address in all_ip_addresses:
230 ip_addr = address.split('/')[0]
231 if IPAddress(ip_addr) in IPNetwork(cidr):
232 return ip_addr
233 except Exception as e:
234 raise utils.exceptions.InvalidConfigException(
235 f"Could not find the IP at the interface {cidr} at the node "
236 f"with K8S Private IP {ip}. Please check the configuration. "
237 f"\nException: {e}")
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300238
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100239
240class IperfAtVM(object):
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200241
242 def __init__(self, fip, user='ubuntu', password='password',
243 private_key=None):
244
245 transport = SSHTransport(fip, user, password, private_key)
246 config = utils.get_configuration()
247
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300248 # Install iperf, iperf3 using apt or downloaded deb packages
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200249 internet_at_vms = utils.get_configuration().get("internet_at_vms")
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300250 path_to_iperf_deb = (config.get('iperf_deb_package_dir_path') or
251 "/opt/packages/")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200252 if internet_at_vms.lower() == 'false':
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300253 logger.info("Copying offline iperf3 deb packages, installing...")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200254 home_ubuntu = "/home/ubuntu/"
255 transport.put_iperf3_deb_packages_at_vms(path_to_iperf_deb,
256 home_ubuntu)
Ievgeniia Zadorozhnaf22827b2022-07-20 13:30:32 +0300257 exit_status, stdout, stderr = transport.exec_sync(
258 'sudo dpkg -i {}*.deb'.format(home_ubuntu))
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200259 else:
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300260 logger.info("Installing iperf, iperf3 using apt")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200261 preparation_cmd = config.get('iperf_prep_string') or ['']
262 transport.exec_command(preparation_cmd)
Ievgeniia Zadorozhnaf22827b2022-07-20 13:30:32 +0300263 exit_status, stdout, stderr = transport.exec_sync(
264 'sudo apt-get update && sudo apt-get install -y iperf3 iperf')
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200265
266 # Log whether iperf is installed with version
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300267 check = transport.exec_command('dpkg -l | grep ii | grep iperf3')
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300268 logger.info(check.decode('utf-8'))
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300269 if not check:
270 if internet_at_vms.lower() == 'true':
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100271 info = "Please check the Internet access at VM"
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300272 else:
273 info = "Could not put offline iperf packages from {} to the " \
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100274 "VM".format(path_to_iperf_deb)
275 raise utils.exceptions.NoPackageInstalled(
276 "iperf3 is not installed at VM with FIP {}. {}.\nStdout, "
277 "stderr at VM:\n{}\n{}".format(fip, info, stdout, stderr))
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200278 # Staring iperf server
279 transport.exec_command('nohup iperf3 -s > file 2>&1 &')
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300280 transport.exec_command('nohup iperf -s > file 2>&1 &')
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100281
282
283class IperfAtNode(object):
284
285 def __init__(self, ip, iperf_test_ip, user='mcc-user', private_key=None):
286 transport = SSHTransport(ip, user, private_key=private_key)
287 # TODO: to avoid looping, both packages can be installed by one
288 # 'install -y iperf iperf3' command. 'which' command can also be
289 # executed for multiple packages at once.
Ievgeniia Zadorozhna12eb2e42025-03-21 17:28:38 +0100290 packages = ["iperf3", "iperf"]
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100291 for p in packages:
292 check_path = transport.exec_command(f'which {p}')
293 if not check_path:
294 self.install_iperf = True
295 # Install iperf/iperf3 at MOSK nodes
296 logger.info(f"Installing {p} at MOSK nodes...")
297 _, stdout, stderr = transport.exec_sync(
298 f"sudo apt update && sudo apt install -y {p}")
299 else:
300 self.install_iperf = False
301 check_path = transport.exec_command(f'which {p}')
302 # Log whether iperf is installed
303 logger.info(f"{p} package path: {check_path.decode('utf-8')}")
304 if not check_path:
305 raise utils.exceptions.NoPackageInstalled(
306 f"{p} is not installed at the MOSK node with IP {ip}.\n"
307 f"Stdout, stderr at VM:\n{stdout}\n{stderr}")
308 # Staring iperf/iperf3 server
309 transport.exec_command(
310 f"nohup {p} -s -B {iperf_test_ip} > file 2>&1 &")
311
312 @staticmethod
313 def remove_iperf_packages(ip, user='mcc-user', private_key=None):
314 transport = SSHTransport(ip, user, private_key=private_key)
315 logger.info(f"Removing iperf,iperf3 packages from the node {ip}...")
316 transport.exec_command("sudo apt remove iperf iperf3 -y")