blob: 20313d0a3ba618eb7c13ac280877c96206e0be9a [file] [log] [blame]
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +02001from io import StringIO
2import logging
3import select
4import utils
5import paramiko
6import time
7import os
8
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +01009from netaddr import IPNetwork, IPAddress
10
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020011logger = logging.getLogger(__name__)
12
13# Suppress paramiko logging
14logging.getLogger("paramiko").setLevel(logging.WARNING)
15
16
Ievgeniia Zadorozhna96a715b2024-03-02 00:09:58 +010017def install_iperf_at_vms_and_get_mtu(vm_info):
18 transport1 = SSHTransport(vm_info['fip'], 'ubuntu', password='dd',
19 private_key=vm_info['private_key'])
20 try:
21 IperfAtVM(vm_info['fip'], private_key=vm_info['private_key'])
22 logger.info("Getting MTU values from VMs...")
23 mtu = transport1.get_mtu_from_vm(
24 vm_info['fip'], private_key=vm_info['private_key'])
25 logger.info(f"MTU at VM {vm_info['fip']} is {mtu}")
26 return mtu
27 except Exception as e:
28 print(f"Error on VM {vm_info['fip']}: {e}")
29 return None
30
31
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020032class SSHTransport(object):
33 def __init__(self, address, username, password=None,
34 private_key=None, look_for_keys=False, *args, **kwargs):
35
36 self.address = address
37 self.username = username
38 self.password = password
39 if private_key is not None:
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +010040 if os.path.isfile(private_key):
41 with open(private_key, 'r') as key_file:
42 private_key_content = key_file.read()
43 private_key = private_key_content
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +020044 self.private_key = paramiko.RSAKey.from_private_key(
45 StringIO(private_key))
46 else:
47 self.private_key = None
48
49 self.look_for_keys = look_for_keys
50 self.buf_size = 1024
51 self.channel_timeout = 10.0
52
53 def _get_ssh_connection(self):
54 ssh = paramiko.SSHClient()
55 ssh.set_missing_host_key_policy(
56 paramiko.AutoAddPolicy())
57 ssh.connect(self.address, username=self.username,
58 password=self.password, pkey=self.private_key,
59 timeout=self.channel_timeout)
60 logger.debug("Successfully connected to: {0}".format(self.address))
61 return ssh
62
63 def _get_sftp_connection(self):
64 transport = paramiko.Transport((self.address, 22))
65 transport.connect(username=self.username,
66 password=self.password,
67 pkey=self.private_key)
68
69 return paramiko.SFTPClient.from_transport(transport)
70
71 def exec_sync(self, cmd):
72 logger.debug("Executing {0} on host {1}".format(cmd, self.address))
73 ssh = self._get_ssh_connection()
74 transport = ssh.get_transport()
75 channel = transport.open_session()
76 channel.fileno()
77 channel.exec_command(cmd)
78 channel.shutdown_write()
79 out_data = []
80 err_data = []
81 poll = select.poll()
82 poll.register(channel, select.POLLIN)
83
84 while True:
85 ready = poll.poll(self.channel_timeout)
86 if not any(ready):
87 continue
88 if not ready[0]:
89 continue
90 out_chunk = err_chunk = None
91 if channel.recv_ready():
92 out_chunk = channel.recv(self.buf_size)
93 out_data += out_chunk,
94 if channel.recv_stderr_ready():
95 err_chunk = channel.recv_stderr(self.buf_size)
96 err_data += err_chunk,
97 if channel.closed and not err_chunk and not out_chunk:
98 break
99 exit_status = channel.recv_exit_status()
100 logger.debug("Command {0} executed with status: {1}"
101 .format(cmd, exit_status))
102 return (exit_status, b" ".join(out_data).strip(),
103 b" ".join(err_data).strip())
104
105 def exec_command(self, cmd):
106 exit_status, stdout, stderr = self.exec_sync(cmd)
107 return stdout
108
109 def check_call(self, command, error_info=None, expected=None,
110 raise_on_err=True):
111 """Execute command and check for return code
112 :type command: str
113 :type error_info: str
114 :type expected: list
115 :type raise_on_err: bool
116 :rtype: ExecResult
117 :raises: DevopsCalledProcessError
118 """
119 if expected is None:
120 expected = [0]
121 ret = self.exec_sync(command)
122 exit_code, stdout_str, stderr_str = ret
123 if exit_code not in expected:
124 message = (
125 "{append}Command '{cmd}' returned exit code {code} while "
126 "expected {expected}\n"
127 "\tSTDOUT:\n"
128 "{stdout}"
129 "\n\tSTDERR:\n"
130 "{stderr}".format(
131 append=error_info + '\n' if error_info else '',
132 cmd=command,
133 code=exit_code,
134 expected=expected,
135 stdout=stdout_str,
136 stderr=stderr_str
137 ))
138 logger.error(message)
139 if raise_on_err:
140 exit()
141 return ret
142
143 def put_file(self, source_path, destination_path):
144 sftp = self._get_sftp_connection()
145 sftp.put(source_path, destination_path)
146 sftp.close()
147
148 def put_iperf3_deb_packages_at_vms(self, source_directory,
149 destination_directory):
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300150 required_packages = ['iperf', 'iperf3', 'libiperf0', 'libsctp1']
Ievgeniia Zadorozhnae1426742022-06-16 17:27:24 +0300151 iperf_deb_files = [pack for pack in os.listdir(source_directory) if
152 any(req in pack for req in required_packages) if
153 pack.endswith('.deb')]
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200154 if not iperf_deb_files:
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100155 raise utils.exceptions.NoPackageInstalled(
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300156 "iperf3 or iperf *.deb packages are not found locally at path"
157 " {}. Please recheck 'iperf_deb_package_dir_path' variable in "
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200158 "global_config.yaml and check *.deb packages are manually "
159 "copied there.".format(source_directory))
160 for f in iperf_deb_files:
161 source_abs_path = "{}/{}".format(source_directory, f)
162 dest_abs_path = "{}/{}".format(destination_directory, f)
163 self.put_file(source_abs_path, dest_abs_path)
164
165 def get_file(self, source_path, destination_path):
166 sftp = self._get_sftp_connection()
167 sftp.get(source_path, destination_path)
168 sftp.close()
169
170 def _is_timed_out(self, start_time, timeout):
171 return (time.time() - timeout) > start_time
172
173 def check_vm_is_reachable_ssh(self, floating_ip, timeout=500, sleep=5):
174 bsleep = sleep
175 ssh = paramiko.SSHClient()
176 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
177 _start_time = time.time()
178 attempts = 0
179 while True:
180 try:
181 ssh.connect(floating_ip, username=self.username,
182 password=self.password, pkey=self.private_key,
183 timeout=self.channel_timeout)
184 logger.info("VM with FIP {} is reachable via SSH. Success!"
185 "".format(floating_ip))
186 return True
187 except Exception as e:
188 ssh.close()
189 if self._is_timed_out(_start_time, timeout):
190 logger.info("VM with FIP {} is not reachable via SSH. "
191 "See details: {}".format(floating_ip, e))
192 raise TimeoutError(
193 "\nFailed to establish authenticated ssh connection "
194 "to {} after {} attempts during {} seconds.\n{}"
195 "".format(floating_ip, attempts, timeout, e))
196 attempts += 1
197 logger.info("Failed to establish authenticated ssh connection "
198 "to {}. Number attempts: {}. Retry after {} "
199 "seconds.".format(floating_ip, attempts, bsleep))
200 time.sleep(bsleep)
201
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100202 @staticmethod
203 def get_mtu_from_vm(floating_ip, user='ubuntu', password='password',
Ievgeniia Zadorozhna0facf3c2022-06-16 16:19:09 +0300204 private_key=None):
205 transport = SSHTransport(floating_ip, user, password, private_key)
206 iface = (transport.exec_command(
207 'ls /sys/class/net | grep -v lo | head -n 1')).decode("utf-8")
208 mtu = transport.exec_command('cat /sys/class/net/{}/mtu'.format(iface))
209 return mtu.decode("utf-8")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200210
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100211 @staticmethod
212 def get_node_ip_addresses_from_cidr(ip, cidr, user='mcc-user',
213 private_key=None):
214 transport = SSHTransport(
215 ip, username=user, private_key=private_key)
216 command = "ip -4 addr show scope global"
217 output = transport.exec_command(command)
218 try:
219 all_ip_addresses = [line.split()[1] for line in
220 output.decode().splitlines()
221 if "/" in line.split()[1]]
222 for address in all_ip_addresses:
223 ip_addr = address.split('/')[0]
224 if IPAddress(ip_addr) in IPNetwork(cidr):
225 return ip_addr
226 except Exception as e:
227 raise utils.exceptions.InvalidConfigException(
228 f"Could not find the IP at the interface {cidr} at the node "
229 f"with K8S Private IP {ip}. Please check the configuration. "
230 f"\nException: {e}")
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300231
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100232
233class IperfAtVM(object):
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200234
235 def __init__(self, fip, user='ubuntu', password='password',
236 private_key=None):
237
238 transport = SSHTransport(fip, user, password, private_key)
239 config = utils.get_configuration()
240
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300241 # Install iperf, iperf3 using apt or downloaded deb packages
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200242 internet_at_vms = utils.get_configuration().get("internet_at_vms")
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300243 path_to_iperf_deb = (config.get('iperf_deb_package_dir_path') or
244 "/opt/packages/")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200245 if internet_at_vms.lower() == 'false':
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300246 logger.info("Copying offline iperf3 deb packages, installing...")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200247 home_ubuntu = "/home/ubuntu/"
248 transport.put_iperf3_deb_packages_at_vms(path_to_iperf_deb,
249 home_ubuntu)
Ievgeniia Zadorozhnaf22827b2022-07-20 13:30:32 +0300250 exit_status, stdout, stderr = transport.exec_sync(
251 'sudo dpkg -i {}*.deb'.format(home_ubuntu))
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200252 else:
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300253 logger.info("Installing iperf, iperf3 using apt")
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200254 preparation_cmd = config.get('iperf_prep_string') or ['']
255 transport.exec_command(preparation_cmd)
Ievgeniia Zadorozhnaf22827b2022-07-20 13:30:32 +0300256 exit_status, stdout, stderr = transport.exec_sync(
257 'sudo apt-get update && sudo apt-get install -y iperf3 iperf')
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200258
259 # Log whether iperf is installed with version
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300260 check = transport.exec_command('dpkg -l | grep ii | grep iperf3')
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300261 logger.info(check.decode('utf-8'))
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300262 if not check:
263 if internet_at_vms.lower() == 'true':
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100264 info = "Please check the Internet access at VM"
Ievgeniia Zadorozhna2c6469d2022-08-10 17:21:10 +0300265 else:
266 info = "Could not put offline iperf packages from {} to the " \
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100267 "VM".format(path_to_iperf_deb)
268 raise utils.exceptions.NoPackageInstalled(
269 "iperf3 is not installed at VM with FIP {}. {}.\nStdout, "
270 "stderr at VM:\n{}\n{}".format(fip, info, stdout, stderr))
Ievgeniia Zadorozhna84023022021-12-30 13:00:41 +0200271 # Staring iperf server
272 transport.exec_command('nohup iperf3 -s > file 2>&1 &')
Ievgeniia Zadorozhna5ed74e22022-07-26 16:56:23 +0300273 transport.exec_command('nohup iperf -s > file 2>&1 &')
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +0100274
275
276class IperfAtNode(object):
277
278 def __init__(self, ip, iperf_test_ip, user='mcc-user', private_key=None):
279 transport = SSHTransport(ip, user, private_key=private_key)
280 # TODO: to avoid looping, both packages can be installed by one
281 # 'install -y iperf iperf3' command. 'which' command can also be
282 # executed for multiple packages at once.
283 packages = ["iperf", "iperf3"]
284 for p in packages:
285 check_path = transport.exec_command(f'which {p}')
286 if not check_path:
287 self.install_iperf = True
288 # Install iperf/iperf3 at MOSK nodes
289 logger.info(f"Installing {p} at MOSK nodes...")
290 _, stdout, stderr = transport.exec_sync(
291 f"sudo apt update && sudo apt install -y {p}")
292 else:
293 self.install_iperf = False
294 check_path = transport.exec_command(f'which {p}')
295 # Log whether iperf is installed
296 logger.info(f"{p} package path: {check_path.decode('utf-8')}")
297 if not check_path:
298 raise utils.exceptions.NoPackageInstalled(
299 f"{p} is not installed at the MOSK node with IP {ip}.\n"
300 f"Stdout, stderr at VM:\n{stdout}\n{stderr}")
301 # Staring iperf/iperf3 server
302 transport.exec_command(
303 f"nohup {p} -s -B {iperf_test_ip} > file 2>&1 &")
304
305 @staticmethod
306 def remove_iperf_packages(ip, user='mcc-user', private_key=None):
307 transport = SSHTransport(ip, user, private_key=private_key)
308 logger.info(f"Removing iperf,iperf3 packages from the node {ip}...")
309 transport.exec_command("sudo apt remove iperf iperf3 -y")