Fix issue with floating ips for test_share_basic_ops
As far as Nova extensions are deprecated we should replace
compute_floating_ips_client on floating_ips_client
Change-Id: I8484a4186609f79b8c501121a7adbffe75f15e30
Closes-Issue: https://mirantis.jira.com/browse/PROD-20608
(cherry picked from commit 58c93ef3fd9ba73c39671cef6ac3da684c424018)
(cherry picked from commit cc581cc6f9befb21725206b5ec6e1bdcc5155fa5)
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index 32fd735..af91cfb 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -16,7 +16,10 @@
import ddt
from oslo_log import log as logging
+from tempest.common import waiters
from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions
import testtools
from testtools import testcase as tc
@@ -27,6 +30,7 @@
from manila_tempest_tests import utils
CONF = config.CONF
+
LOG = logging.getLogger(__name__)
@@ -43,6 +47,218 @@
* Mount share
* Terminate the instance
"""
+ protocol = None
+ ip_version = 4
+
+ @property
+ def use_ipv6(self):
+ return self.ip_version == 6
+
+ def setUp(self):
+ super(ShareBasicOpsBase, self).setUp()
+ if self.use_ipv6 and not CONF.share.run_ipv6_tests:
+ raise self.skipException("IPv6 tests are disabled")
+ base.verify_test_has_appropriate_tags(self)
+ self.image_ref = None
+ # Setup image and flavor the test instance
+ # Support both configured and injected values
+ self.floatings = {}
+ if self.protocol not in CONF.share.enable_protocols:
+ message = "%s tests are disabled" % self.protocol
+ raise self.skipException(message)
+ if self.protocol not in CONF.share.enable_ip_rules_for_protocols:
+ message = ("%s tests for access rules other than IP are disabled" %
+ self.protocol)
+ raise self.skipException(message)
+ if not hasattr(self, 'flavor_ref'):
+ self.flavor_ref = CONF.share.client_vm_flavor_ref
+ if CONF.share.image_with_share_tools:
+ images = self.compute_images_client.list_images()["images"]
+ for img in images:
+ if img["name"] == CONF.share.image_with_share_tools:
+ self.image_ref = img['id']
+ break
+ if not self.image_ref:
+ msg = ("Image %s not found" %
+ CONF.share.image_with_share_tools)
+ raise exceptions.InvalidConfiguration(message=msg)
+ self.ssh_user = CONF.share.image_username
+ LOG.debug('Starting test for i:{image}, f:{flavor}. '
+ 'user: {ssh_user}'.format(
+ image=self.image_ref, flavor=self.flavor_ref,
+ ssh_user=self.ssh_user))
+ self.security_group = self._create_security_group()
+ self.create_share_network()
+
+ def boot_instance(self, wait_until="ACTIVE"):
+ self.keypair = self.create_keypair()
+ security_groups = [{'name': self.security_group['name']}]
+ create_kwargs = {
+ 'key_name': self.keypair['name'],
+ 'security_groups': security_groups,
+ 'wait_until': wait_until,
+ 'networks': [{'uuid': self.net['id']}, ],
+ }
+ instance = self.create_server(
+ image_id=self.image_ref, flavor=self.flavor_ref, **create_kwargs)
+ return instance
+
+ def init_ssh(self, instance):
+ if self.use_ipv6:
+ server_ip = self._get_ipv6_server_ip(instance)
+ else:
+ # Obtain a floating IP
+ floating_ip = (
+ self.floating_ips_client.create_floatingip(
+ floating_network_id=CONF.network.public_network_id)
+ ['floatingip'])
+ self.floatings[instance['id']] = floating_ip
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.floating_ips_client.delete_floatingip,
+ floating_ip['id'])
+ # Attach a floating IP
+ self.compute_floating_ips_client.associate_floating_ip_to_server(
+ floating_ip['floating_ip_address'], instance['id'])
+ server_ip = floating_ip['floating_ip_address']
+ self.assertIsNotNone(server_ip)
+ # Check ssh
+ ssh_client = self.get_remote_client(
+ server_or_ip=server_ip,
+ username=self.ssh_user,
+ private_key=self.keypair['private_key'])
+
+ # NOTE(u_glide): Workaround for bug #1465682
+ ssh_client = ssh_client.ssh_client
+
+ self.share = self.shares_client.get_share(self.share['id'])
+ return ssh_client
+
+ def mount_share(self, location, ssh_client, target_dir=None):
+ raise NotImplementedError
+
+ def umount_share(self, ssh_client, target_dir=None):
+ target_dir = target_dir or "/mnt"
+ ssh_client.exec_command("sudo umount %s" % target_dir)
+
+ def write_data(self, data, ssh_client):
+ ssh_client.exec_command("echo \"%s\" | sudo tee /mnt/t1 && sudo sync" %
+ data)
+
+ def read_data(self, ssh_client):
+ data = ssh_client.exec_command("sudo cat /mnt/t1")
+ return data.rstrip()
+
+ def migrate_share(self, share_id, dest_host, status, force_host_assisted):
+ share = self._migrate_share(
+ share_id, dest_host, status, force_host_assisted,
+ self.shares_admin_v2_client)
+ return share
+
+ def migration_complete(self, share_id, dest_host):
+ return self._migration_complete(share_id, dest_host)
+
+ def create_share_network(self):
+ self.net = self._create_network(namestart="manila-share")
+ self.subnet = self._create_subnet(
+ network=self.net,
+ namestart="manila-share-sub",
+ ip_version=self.ip_version,
+ use_default_subnetpool=self.use_ipv6)
+ router = self._get_router()
+ self._create_router_interface(subnet_id=self.subnet['id'],
+ router_id=router['id'])
+ self.share_net = self._create_share_network(
+ neutron_net_id=self.net['id'],
+ neutron_subnet_id=self.subnet['id'],
+ name=data_utils.rand_name("sn-name"))
+
+ def _get_ipv6_server_ip(self, instance):
+ for net_list in instance['addresses'].values():
+ for net_data in net_list:
+ if net_data['version'] == 6:
+ return net_data['addr']
+
+ def _get_share_type(self):
+ if CONF.share.default_share_type_name:
+ return self.shares_client.get_share_type(
+ CONF.share.default_share_type_name)['share_type']
+ return self._create_share_type(
+ data_utils.rand_name("share_type"),
+ extra_specs={
+ 'snapshot_support': CONF.share.capability_snapshot_support,
+ 'driver_handles_share_servers': CONF.share.multitenancy_enabled
+ },)['share_type']
+
+ def create_share(self, **kwargs):
+ kwargs.update({
+ 'share_protocol': self.protocol,
+ })
+ if not ('share_type_id' in kwargs or 'snapshot_id' in kwargs):
+ kwargs.update({'share_type_id': self._get_share_type()['id']})
+ if CONF.share.multitenancy_enabled:
+ kwargs.update({'share_network_id': self.share_net['id']})
+ self.share = self._create_share(**kwargs)
+ return self.share
+
+ def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
+ snapshot=None):
+ if instance and not ip:
+ try:
+ net_addresses = instance['addresses']
+ first_address = net_addresses.values()[0][0]
+ ip = first_address['addr']
+ except Exception:
+ LOG.debug("Instance: %s", instance)
+ # In case on an error ip will be still none
+ LOG.exception("Instance does not have a valid IP address."
+ "Falling back to default")
+ if not ip:
+ ip = '0.0.0.0/0'
+
+ if snapshot:
+ self._allow_access_snapshot(snapshot['id'], access_type='ip',
+ access_to=ip, cleanup=cleanup)
+ else:
+ self._allow_access(share_id, access_type='ip', access_to=ip,
+ cleanup=cleanup, client=self.shares_v2_client)
+
+ def provide_access_to_auxiliary_instance(self, instance, share=None,
+ snapshot=None):
+ share = share or self.share
+ if self.protocol.lower() == 'cifs':
+ self.allow_access_ip(
+ share['id'], instance=instance, cleanup=False,
+ snapshot=snapshot)
+ elif not CONF.share.multitenancy_enabled:
+ if self.use_ipv6:
+ server_ip = self._get_ipv6_server_ip(instance)
+ else:
+ server_ip = (CONF.share.override_ip_for_nfs_access or
+ self.floatings[instance['id']]
+ ['floating_ip_address'])
+ self.assertIsNotNone(server_ip)
+ self.allow_access_ip(
+ share['id'], ip=server_ip,
+ instance=instance, cleanup=False, snapshot=snapshot)
+ elif (CONF.share.multitenancy_enabled and
+ self.protocol.lower() == 'nfs'):
+ self.allow_access_ip(
+ share['id'], instance=instance, cleanup=False,
+ snapshot=snapshot)
+
+ def wait_for_active_instance(self, instance_id):
+ waiters.wait_for_server_status(
+ self.os_primary.servers_client, instance_id, "ACTIVE")
+ return self.os_primary.servers_client.show_server(
+ instance_id)["server"]
+
+ def _ping_export_location(self, export, ssh_client):
+ ip, version = self.get_ip_and_version_from_export_location(export)
+ if version == 6:
+ ssh_client.exec_command("ping6 -c 1 %s" % ip)
+ else:
+ ssh_client.exec_command("ping -c 1 %s" % ip)
def get_ip_and_version_from_export_location(self, export):
export = export.replace('[', '').replace(']', '')
@@ -58,13 +274,6 @@
raise self.skipException(message)
return ip, version
- def _ping_host_from_export_location(self, export, remote_client):
- ip, version = self.get_ip_and_version_from_export_location(export)
- if version == 6:
- remote_client.exec_command("ping6 -c 1 %s" % ip)
- else:
- remote_client.exec_command("ping -c 1 %s" % ip)
-
def _get_export_locations_according_to_ip_version(
self, all_locations, error_on_invalid_ip_version):
locations = [
@@ -78,21 +287,6 @@
raise self.skipException(message)
return locations
- def _get_user_export_locations(self, share=None, snapshot=None,
- error_on_invalid_ip_version=False):
- locations = None
- if share:
- locations = self._get_share_export_locations(share)
- elif snapshot:
- locations = self._get_snapshot_export_locations(snapshot)
-
- self.assertNotEmpty(locations)
- locations = self._get_export_locations_according_to_ip_version(
- locations, error_on_invalid_ip_version)
- self.assertNotEmpty(locations)
-
- return locations
-
def _get_share_export_locations(self, share):
if utils.is_microversion_lt(CONF.share.max_api_microversion, "2.9"):
@@ -104,51 +298,25 @@
return locations
- def _get_snapshot_export_locations(self, snapshot):
- exports = (self.shares_v2_client.
- list_snapshot_export_locations(snapshot['id']))
- locations = [x['path'] for x in exports]
-
- return locations
-
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_mount_share_one_vm(self):
instance = self.boot_instance(wait_until="BUILD")
self.create_share()
locations = self._get_user_export_locations(self.share)
instance = self.wait_for_active_instance(instance["id"])
- remote_client = self.init_remote_client(instance)
+ ssh_client = self.init_ssh(instance)
self.provide_access_to_auxiliary_instance(instance)
for location in locations:
- self.mount_share(location, remote_client)
- self.umount_share(remote_client)
+ self.mount_share(location, ssh_client)
+ self.umount_share(ssh_client)
- @tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
- def test_write_with_ro_access(self):
- '''Test if an instance with ro access can write on the share.'''
- test_data = "Some test data to write"
+ def _get_snapshot_export_locations(self, snapshot):
+ exports = (self.shares_v2_client.
+ list_snapshot_export_locations(snapshot['id']))
+ locations = [x['path'] for x in exports]
- instance = self.boot_instance(wait_until="BUILD")
- self.create_share()
- location = self._get_user_export_locations(self.share)[0]
- instance = self.wait_for_active_instance(instance["id"])
-
- remote_client_inst = self.init_remote_client(instance)
-
- # First, check if write works RW access.
- acc_rule_id = self.provide_access_to_auxiliary_instance(instance)['id']
- self.mount_share(location, remote_client_inst)
- self.write_data_to_mounted_share(test_data, remote_client_inst)
- self.deny_access(self.share['id'], acc_rule_id)
-
- self.provide_access_to_auxiliary_instance(instance, access_level='ro')
- self.addCleanup(self.umount_share, remote_client_inst)
-
- # Test if write with RO access fails.
- self.assertRaises(exceptions.SSHExecCommandFailed,
- self.write_data_to_mounted_share,
- test_data, remote_client_inst)
+ return locations
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_write_two_vms(self):
@@ -164,23 +332,23 @@
instance2 = self.wait_for_active_instance(instance2["id"])
# Write data to first VM
- remote_client_inst1 = self.init_remote_client(instance1)
+ ssh_client_inst1 = self.init_ssh(instance1)
self.provide_access_to_auxiliary_instance(instance1)
- self.mount_share(location, remote_client_inst1)
+ self.mount_share(location, ssh_client_inst1)
self.addCleanup(self.umount_share,
- remote_client_inst1)
- self.write_data_to_mounted_share(test_data, remote_client_inst1)
+ ssh_client_inst1)
+ self.write_data(test_data, ssh_client_inst1)
# Read from second VM
- remote_client_inst2 = self.init_remote_client(instance2)
- if not CONF.share.override_ip_for_nfs_access or self.ipv6_enabled:
+ ssh_client_inst2 = self.init_ssh(instance2)
+ if not CONF.share.override_ip_for_nfs_access or self.use_ipv6:
self.provide_access_to_auxiliary_instance(instance2)
- self.mount_share(location, remote_client_inst2)
+ self.mount_share(location, ssh_client_inst2)
self.addCleanup(self.umount_share,
- remote_client_inst2)
- data = self.read_data_from_mounted_share(remote_client_inst2)
+ ssh_client_inst2)
+ data = self.read_data(ssh_client_inst2)
self.assertEqual(test_data, data)
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@@ -204,10 +372,6 @@
raise self.skipException("Only NFS protocol supported "
"at this moment.")
- if self.ipv6_enabled:
- raise self.skipException("Share Migration using IPv6 is not "
- "supported at this moment.")
-
pools = self.shares_admin_v2_client.list_pools(detail=True)['pools']
if len(pools) < 2:
@@ -231,29 +395,29 @@
dest_pool = dest_pool['name']
- remote_client = self.init_remote_client(instance)
+ ssh_client = self.init_ssh(instance)
self.provide_access_to_auxiliary_instance(instance)
- self.mount_share(exports[0], remote_client)
+ self.mount_share(exports[0], ssh_client)
- remote_client.exec_command("sudo mkdir -p /mnt/f1")
- remote_client.exec_command("sudo mkdir -p /mnt/f2")
- remote_client.exec_command("sudo mkdir -p /mnt/f3")
- remote_client.exec_command("sudo mkdir -p /mnt/f4")
- remote_client.exec_command("sudo mkdir -p /mnt/f1/ff1")
- remote_client.exec_command("sleep 1")
- remote_client.exec_command(
+ ssh_client.exec_command("sudo mkdir -p /mnt/f1")
+ ssh_client.exec_command("sudo mkdir -p /mnt/f2")
+ ssh_client.exec_command("sudo mkdir -p /mnt/f3")
+ ssh_client.exec_command("sudo mkdir -p /mnt/f4")
+ ssh_client.exec_command("sudo mkdir -p /mnt/f1/ff1")
+ ssh_client.exec_command("sleep 1")
+ ssh_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f1/1m1.bin bs=1M count=1")
- remote_client.exec_command(
+ ssh_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f2/1m2.bin bs=1M count=1")
- remote_client.exec_command(
+ ssh_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f3/1m3.bin bs=1M count=1")
- remote_client.exec_command(
+ ssh_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f4/1m4.bin bs=1M count=1")
- remote_client.exec_command(
+ ssh_client.exec_command(
"sudo dd if=/dev/zero of=/mnt/f1/ff1/1m5.bin bs=1M count=1")
- remote_client.exec_command("sudo chmod -R 555 /mnt/f3")
- remote_client.exec_command("sudo chmod -R 777 /mnt/f4")
+ ssh_client.exec_command("sudo chmod -R 555 /mnt/f3")
+ ssh_client.exec_command("sudo chmod -R 777 /mnt/f4")
task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
if force_host_assisted
@@ -265,10 +429,10 @@
if force_host_assisted:
self.assertRaises(
exceptions.SSHExecCommandFailed,
- remote_client.exec_command,
+ ssh_client.exec_command,
"dd if=/dev/zero of=/mnt/f1/1m6.bin bs=1M count=1")
- self.umount_share(remote_client)
+ self.umount_share(ssh_client)
self.share = self.migration_complete(self.share['id'], dest_pool)
@@ -279,11 +443,11 @@
self.assertEqual(constants.TASK_STATE_MIGRATION_SUCCESS,
self.share['task_state'])
- self.mount_share(new_exports[0], remote_client)
+ self.mount_share(new_exports[0], ssh_client)
- output = remote_client.exec_command("ls -lRA --ignore=lost+found /mnt")
+ output = ssh_client.exec_command("ls -lRA --ignore=lost+found /mnt")
- self.umount_share(remote_client)
+ self.umount_share(ssh_client)
self.assertIn('1m1.bin', output)
self.assertIn('1m2.bin', output)
@@ -291,6 +455,21 @@
self.assertIn('1m4.bin', output)
self.assertIn('1m5.bin', output)
+ def _get_user_export_locations(self, share=None, snapshot=None,
+ error_on_invalid_ip_version=False):
+ locations = None
+ if share:
+ locations = self._get_share_export_locations(share)
+ elif snapshot:
+ locations = self._get_snapshot_export_locations(snapshot)
+
+ self.assertNotEmpty(locations)
+ locations = self._get_export_locations_according_to_ip_version(
+ locations, error_on_invalid_ip_version)
+ self.assertNotEmpty(locations)
+
+ return locations
+
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@testtools.skipUnless(
CONF.share.run_snapshot_tests, "Snapshot tests are disabled.")
@@ -308,7 +487,7 @@
self.addCleanup(self.servers_client.delete_server, instance['id'])
# 3 - SSH to UVM, ok, connected
- remote_client = self.init_remote_client(instance)
+ ssh_client = self.init_ssh(instance)
# 4 - Provide RW access to S1, ok, provided
self.provide_access_to_auxiliary_instance(instance, parent_share)
@@ -316,20 +495,20 @@
# 5 - Try mount S1 to UVM, ok, mounted
user_export_location = self._get_user_export_locations(parent_share)[0]
parent_share_dir = "/mnt/parent"
- remote_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
+ ssh_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
- self.mount_share(user_export_location, remote_client, parent_share_dir)
- self.addCleanup(self.umount_share, remote_client, parent_share_dir)
+ self.mount_share(user_export_location, ssh_client, parent_share_dir)
+ self.addCleanup(self.umount_share, ssh_client, parent_share_dir)
# 6 - Create "file1", ok, created
- remote_client.exec_command("sudo touch %s/file1" % parent_share_dir)
+ ssh_client.exec_command("sudo touch %s/file1" % parent_share_dir)
# 7 - Create snapshot SS1 from S1, ok, created
snapshot = self._create_snapshot(parent_share['id'])
# 8 - Create "file2" in share S1 - ok, created. We expect that
# snapshot will not contain any data created after snapshot creation.
- remote_client.exec_command("sudo touch %s/file2" % parent_share_dir)
+ ssh_client.exec_command("sudo touch %s/file2" % parent_share_dir)
# 9 - Create share S2 from SS1, ok, created
child_share = self.create_share(snapshot_id=snapshot["id"])
@@ -338,40 +517,37 @@
# did not get access rules from parent share.
user_export_location = self._get_user_export_locations(child_share)[0]
child_share_dir = "/mnt/child"
- remote_client.exec_command("sudo mkdir -p %s" % child_share_dir)
+ ssh_client.exec_command("sudo mkdir -p %s" % child_share_dir)
self.assertRaises(
exceptions.SSHExecCommandFailed,
self.mount_share,
- user_export_location, remote_client, child_share_dir,
+ user_export_location, ssh_client, child_share_dir,
)
# 11 - Provide RW access to S2, ok, provided
self.provide_access_to_auxiliary_instance(instance, child_share)
# 12 - Try mount S2, ok, mounted
- self.mount_share(user_export_location, remote_client, child_share_dir)
- self.addCleanup(self.umount_share, remote_client, child_share_dir)
+ self.mount_share(user_export_location, ssh_client, child_share_dir)
+ self.addCleanup(self.umount_share, ssh_client, child_share_dir)
# 13 - List files on S2, only "file1" exists
- output = remote_client.exec_command(
- "sudo ls -lRA %s" % child_share_dir)
+ output = ssh_client.exec_command("sudo ls -lRA %s" % child_share_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
# 14 - Create file3 on S2, ok, file created
- remote_client.exec_command("sudo touch %s/file3" % child_share_dir)
+ ssh_client.exec_command("sudo touch %s/file3" % child_share_dir)
# 15 - List files on S1, two files exist - "file1" and "file2"
- output = remote_client.exec_command(
- "sudo ls -lRA %s" % parent_share_dir)
+ output = ssh_client.exec_command("sudo ls -lRA %s" % parent_share_dir)
self.assertIn('file1', output)
self.assertIn('file2', output)
self.assertNotIn('file3', output)
# 16 - List files on S2, two files exist - "file1" and "file3"
- output = remote_client.exec_command(
- "sudo ls -lRA %s" % child_share_dir)
+ output = ssh_client.exec_command("sudo ls -lRA %s" % child_share_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
self.assertIn('file3', output)
@@ -396,7 +572,7 @@
self.addCleanup(self.servers_client.delete_server, instance['id'])
# 3 - SSH to UVM, ok, connected
- remote_client = self.init_remote_client(instance)
+ ssh_client = self.init_ssh(instance)
# 4 - Provide RW access to S1, ok, provided
self.provide_access_to_auxiliary_instance(instance, parent_share)
@@ -405,21 +581,21 @@
user_export_location = self._get_user_export_locations(parent_share)[0]
parent_share_dir = "/mnt/parent"
snapshot_dir = "/mnt/snapshot_dir"
- remote_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
- remote_client.exec_command("sudo mkdir -p %s" % snapshot_dir)
+ ssh_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
+ ssh_client.exec_command("sudo mkdir -p %s" % snapshot_dir)
- self.mount_share(user_export_location, remote_client, parent_share_dir)
- self.addCleanup(self.umount_share, remote_client, parent_share_dir)
+ self.mount_share(user_export_location, ssh_client, parent_share_dir)
+ self.addCleanup(self.umount_share, ssh_client, parent_share_dir)
# 6 - Create "file1", ok, created
- remote_client.exec_command("sudo touch %s/file1" % parent_share_dir)
+ ssh_client.exec_command("sudo touch %s/file1" % parent_share_dir)
# 7 - Create snapshot SS1 from S1, ok, created
snapshot = self._create_snapshot(parent_share['id'])
# 8 - Create "file2" in share S1 - ok, created. We expect that
# snapshot will not contain any data created after snapshot creation.
- remote_client.exec_command("sudo touch %s/file2" % parent_share_dir)
+ ssh_client.exec_command("sudo touch %s/file2" % parent_share_dir)
# 9 - Allow access to SS1
self.provide_access_to_auxiliary_instance(instance, snapshot=snapshot)
@@ -427,45 +603,45 @@
# 10 - Mount SS1
user_export_location = self._get_user_export_locations(
snapshot=snapshot)[0]
- self.mount_share(user_export_location, remote_client, snapshot_dir)
- self.addCleanup(self.umount_share, remote_client, snapshot_dir)
+ self.mount_share(user_export_location, ssh_client, snapshot_dir)
+ self.addCleanup(self.umount_share, ssh_client, snapshot_dir)
# 11 - List files on SS1, only "file1" exists
# NOTE(lseki): using ls without recursion to avoid permission denied
# error while listing lost+found directory on LVM volumes
- output = remote_client.exec_command("sudo ls -lA %s" % snapshot_dir)
+ output = ssh_client.exec_command("sudo ls -lA %s" % snapshot_dir)
self.assertIn('file1', output)
self.assertNotIn('file2', output)
# 12 - Try to create a file on SS1, should fail
self.assertRaises(
exceptions.SSHExecCommandFailed,
- remote_client.exec_command,
+ ssh_client.exec_command,
"sudo touch %s/file3" % snapshot_dir)
class TestShareBasicOpsNFS(ShareBasicOpsBase):
protocol = "nfs"
- def mount_share(self, location, remote_client, target_dir=None):
+ def mount_share(self, location, ssh_client, target_dir=None):
- self._ping_host_from_export_location(location, remote_client)
+ self._ping_export_location(location, ssh_client)
target_dir = target_dir or "/mnt"
- remote_client.exec_command(
+ ssh_client.exec_command(
"sudo mount -vt nfs \"%s\" %s" % (location, target_dir))
class TestShareBasicOpsCIFS(ShareBasicOpsBase):
protocol = "cifs"
- def mount_share(self, location, remote_client, target_dir=None):
+ def mount_share(self, location, ssh_client, target_dir=None):
- self._ping_host_from_export_location(location, remote_client)
+ self._ping_export_location(location, ssh_client)
location = location.replace("\\", "/")
target_dir = target_dir or "/mnt"
- remote_client.exec_command(
+ ssh_client.exec_command(
"sudo mount.cifs \"%s\" %s -o guest" % (location, target_dir)
)