blob: 20313d0a3ba618eb7c13ac280877c96206e0be9a [file] [log] [blame]
from io import StringIO
import logging
import select
import utils
import paramiko
import time
import os
from netaddr import IPNetwork, IPAddress
logger = logging.getLogger(__name__)
# Suppress paramiko logging
logging.getLogger("paramiko").setLevel(logging.WARNING)
def install_iperf_at_vms_and_get_mtu(vm_info):
transport1 = SSHTransport(vm_info['fip'], 'ubuntu', password='dd',
private_key=vm_info['private_key'])
try:
IperfAtVM(vm_info['fip'], private_key=vm_info['private_key'])
logger.info("Getting MTU values from VMs...")
mtu = transport1.get_mtu_from_vm(
vm_info['fip'], private_key=vm_info['private_key'])
logger.info(f"MTU at VM {vm_info['fip']} is {mtu}")
return mtu
except Exception as e:
print(f"Error on VM {vm_info['fip']}: {e}")
return None
class SSHTransport(object):
def __init__(self, address, username, password=None,
private_key=None, look_for_keys=False, *args, **kwargs):
self.address = address
self.username = username
self.password = password
if private_key is not None:
if os.path.isfile(private_key):
with open(private_key, 'r') as key_file:
private_key_content = key_file.read()
private_key = private_key_content
self.private_key = paramiko.RSAKey.from_private_key(
StringIO(private_key))
else:
self.private_key = None
self.look_for_keys = look_for_keys
self.buf_size = 1024
self.channel_timeout = 10.0
def _get_ssh_connection(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(
paramiko.AutoAddPolicy())
ssh.connect(self.address, username=self.username,
password=self.password, pkey=self.private_key,
timeout=self.channel_timeout)
logger.debug("Successfully connected to: {0}".format(self.address))
return ssh
def _get_sftp_connection(self):
transport = paramiko.Transport((self.address, 22))
transport.connect(username=self.username,
password=self.password,
pkey=self.private_key)
return paramiko.SFTPClient.from_transport(transport)
def exec_sync(self, cmd):
logger.debug("Executing {0} on host {1}".format(cmd, self.address))
ssh = self._get_ssh_connection()
transport = ssh.get_transport()
channel = transport.open_session()
channel.fileno()
channel.exec_command(cmd)
channel.shutdown_write()
out_data = []
err_data = []
poll = select.poll()
poll.register(channel, select.POLLIN)
while True:
ready = poll.poll(self.channel_timeout)
if not any(ready):
continue
if not ready[0]:
continue
out_chunk = err_chunk = None
if channel.recv_ready():
out_chunk = channel.recv(self.buf_size)
out_data += out_chunk,
if channel.recv_stderr_ready():
err_chunk = channel.recv_stderr(self.buf_size)
err_data += err_chunk,
if channel.closed and not err_chunk and not out_chunk:
break
exit_status = channel.recv_exit_status()
logger.debug("Command {0} executed with status: {1}"
.format(cmd, exit_status))
return (exit_status, b" ".join(out_data).strip(),
b" ".join(err_data).strip())
def exec_command(self, cmd):
exit_status, stdout, stderr = self.exec_sync(cmd)
return stdout
def check_call(self, command, error_info=None, expected=None,
raise_on_err=True):
"""Execute command and check for return code
:type command: str
:type error_info: str
:type expected: list
:type raise_on_err: bool
:rtype: ExecResult
:raises: DevopsCalledProcessError
"""
if expected is None:
expected = [0]
ret = self.exec_sync(command)
exit_code, stdout_str, stderr_str = ret
if exit_code not in expected:
message = (
"{append}Command '{cmd}' returned exit code {code} while "
"expected {expected}\n"
"\tSTDOUT:\n"
"{stdout}"
"\n\tSTDERR:\n"
"{stderr}".format(
append=error_info + '\n' if error_info else '',
cmd=command,
code=exit_code,
expected=expected,
stdout=stdout_str,
stderr=stderr_str
))
logger.error(message)
if raise_on_err:
exit()
return ret
def put_file(self, source_path, destination_path):
sftp = self._get_sftp_connection()
sftp.put(source_path, destination_path)
sftp.close()
def put_iperf3_deb_packages_at_vms(self, source_directory,
destination_directory):
required_packages = ['iperf', 'iperf3', 'libiperf0', 'libsctp1']
iperf_deb_files = [pack for pack in os.listdir(source_directory) if
any(req in pack for req in required_packages) if
pack.endswith('.deb')]
if not iperf_deb_files:
raise utils.exceptions.NoPackageInstalled(
"iperf3 or iperf *.deb packages are not found locally at path"
" {}. Please recheck 'iperf_deb_package_dir_path' variable in "
"global_config.yaml and check *.deb packages are manually "
"copied there.".format(source_directory))
for f in iperf_deb_files:
source_abs_path = "{}/{}".format(source_directory, f)
dest_abs_path = "{}/{}".format(destination_directory, f)
self.put_file(source_abs_path, dest_abs_path)
def get_file(self, source_path, destination_path):
sftp = self._get_sftp_connection()
sftp.get(source_path, destination_path)
sftp.close()
def _is_timed_out(self, start_time, timeout):
return (time.time() - timeout) > start_time
def check_vm_is_reachable_ssh(self, floating_ip, timeout=500, sleep=5):
bsleep = sleep
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
_start_time = time.time()
attempts = 0
while True:
try:
ssh.connect(floating_ip, username=self.username,
password=self.password, pkey=self.private_key,
timeout=self.channel_timeout)
logger.info("VM with FIP {} is reachable via SSH. Success!"
"".format(floating_ip))
return True
except Exception as e:
ssh.close()
if self._is_timed_out(_start_time, timeout):
logger.info("VM with FIP {} is not reachable via SSH. "
"See details: {}".format(floating_ip, e))
raise TimeoutError(
"\nFailed to establish authenticated ssh connection "
"to {} after {} attempts during {} seconds.\n{}"
"".format(floating_ip, attempts, timeout, e))
attempts += 1
logger.info("Failed to establish authenticated ssh connection "
"to {}. Number attempts: {}. Retry after {} "
"seconds.".format(floating_ip, attempts, bsleep))
time.sleep(bsleep)
@staticmethod
def get_mtu_from_vm(floating_ip, user='ubuntu', password='password',
private_key=None):
transport = SSHTransport(floating_ip, user, password, private_key)
iface = (transport.exec_command(
'ls /sys/class/net | grep -v lo | head -n 1')).decode("utf-8")
mtu = transport.exec_command('cat /sys/class/net/{}/mtu'.format(iface))
return mtu.decode("utf-8")
@staticmethod
def get_node_ip_addresses_from_cidr(ip, cidr, user='mcc-user',
private_key=None):
transport = SSHTransport(
ip, username=user, private_key=private_key)
command = "ip -4 addr show scope global"
output = transport.exec_command(command)
try:
all_ip_addresses = [line.split()[1] for line in
output.decode().splitlines()
if "/" in line.split()[1]]
for address in all_ip_addresses:
ip_addr = address.split('/')[0]
if IPAddress(ip_addr) in IPNetwork(cidr):
return ip_addr
except Exception as e:
raise utils.exceptions.InvalidConfigException(
f"Could not find the IP at the interface {cidr} at the node "
f"with K8S Private IP {ip}. Please check the configuration. "
f"\nException: {e}")
class IperfAtVM(object):
def __init__(self, fip, user='ubuntu', password='password',
private_key=None):
transport = SSHTransport(fip, user, password, private_key)
config = utils.get_configuration()
# Install iperf, iperf3 using apt or downloaded deb packages
internet_at_vms = utils.get_configuration().get("internet_at_vms")
path_to_iperf_deb = (config.get('iperf_deb_package_dir_path') or
"/opt/packages/")
if internet_at_vms.lower() == 'false':
logger.info("Copying offline iperf3 deb packages, installing...")
home_ubuntu = "/home/ubuntu/"
transport.put_iperf3_deb_packages_at_vms(path_to_iperf_deb,
home_ubuntu)
exit_status, stdout, stderr = transport.exec_sync(
'sudo dpkg -i {}*.deb'.format(home_ubuntu))
else:
logger.info("Installing iperf, iperf3 using apt")
preparation_cmd = config.get('iperf_prep_string') or ['']
transport.exec_command(preparation_cmd)
exit_status, stdout, stderr = transport.exec_sync(
'sudo apt-get update && sudo apt-get install -y iperf3 iperf')
# Log whether iperf is installed with version
check = transport.exec_command('dpkg -l | grep ii | grep iperf3')
logger.info(check.decode('utf-8'))
if not check:
if internet_at_vms.lower() == 'true':
info = "Please check the Internet access at VM"
else:
info = "Could not put offline iperf packages from {} to the " \
"VM".format(path_to_iperf_deb)
raise utils.exceptions.NoPackageInstalled(
"iperf3 is not installed at VM with FIP {}. {}.\nStdout, "
"stderr at VM:\n{}\n{}".format(fip, info, stdout, stderr))
# Staring iperf server
transport.exec_command('nohup iperf3 -s > file 2>&1 &')
transport.exec_command('nohup iperf -s > file 2>&1 &')
class IperfAtNode(object):
def __init__(self, ip, iperf_test_ip, user='mcc-user', private_key=None):
transport = SSHTransport(ip, user, private_key=private_key)
# TODO: to avoid looping, both packages can be installed by one
# 'install -y iperf iperf3' command. 'which' command can also be
# executed for multiple packages at once.
packages = ["iperf", "iperf3"]
for p in packages:
check_path = transport.exec_command(f'which {p}')
if not check_path:
self.install_iperf = True
# Install iperf/iperf3 at MOSK nodes
logger.info(f"Installing {p} at MOSK nodes...")
_, stdout, stderr = transport.exec_sync(
f"sudo apt update && sudo apt install -y {p}")
else:
self.install_iperf = False
check_path = transport.exec_command(f'which {p}')
# Log whether iperf is installed
logger.info(f"{p} package path: {check_path.decode('utf-8')}")
if not check_path:
raise utils.exceptions.NoPackageInstalled(
f"{p} is not installed at the MOSK node with IP {ip}.\n"
f"Stdout, stderr at VM:\n{stdout}\n{stderr}")
# Staring iperf/iperf3 server
transport.exec_command(
f"nohup {p} -s -B {iperf_test_ip} > file 2>&1 &")
@staticmethod
def remove_iperf_packages(ip, user='mcc-user', private_key=None):
transport = SSHTransport(ip, user, private_key=private_key)
logger.info(f"Removing iperf,iperf3 packages from the node {ip}...")
transport.exec_command("sudo apt remove iperf iperf3 -y")