Prune local copy of tempest.scenario.manager.py

Remove unused code from manager.py to better identify ironic
dependencies on this tempest interface. This is a follow-up
patch to https://review.openstack.org/#/c/439252/.

Change-Id: I3ca7cf3af42e0e55431d5affa36e567f147aa850
diff --git a/ironic_tempest_plugin/manager.py b/ironic_tempest_plugin/manager.py
index cc8862e..5344f93 100644
--- a/ironic_tempest_plugin/manager.py
+++ b/ironic_tempest_plugin/manager.py
@@ -21,13 +21,10 @@
 
 import subprocess
 
-import netaddr
 from oslo_log import log
-from oslo_serialization import jsonutils as json
 from oslo_utils import netutils
 
 from tempest.common import compute
-from tempest.common import image as common_image
 from tempest.common.utils.linux import remote_client
 from tempest.common.utils import net_utils
 from tempest.common import waiters
@@ -219,115 +216,6 @@
         server = clients.servers_client.show_server(body['id'])['server']
         return server
 
-    def create_volume(self, size=None, name=None, snapshot_id=None,
-                      imageRef=None, volume_type=None):
-        if size is None:
-            size = CONF.volume.volume_size
-        if imageRef:
-            image = self.compute_images_client.show_image(imageRef)['image']
-            min_disk = image.get('minDisk')
-            size = max(size, min_disk)
-        if name is None:
-            name = data_utils.rand_name(self.__class__.__name__ + "-volume")
-        kwargs = {'display_name': name,
-                  'snapshot_id': snapshot_id,
-                  'imageRef': imageRef,
-                  'volume_type': volume_type,
-                  'size': size}
-        volume = self.volumes_client.create_volume(**kwargs)['volume']
-
-        self.addCleanup(self.volumes_client.wait_for_resource_deletion,
-                        volume['id'])
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        self.volumes_client.delete_volume, volume['id'])
-
-        # NOTE(e0ne): Cinder API v2 uses name instead of display_name
-        if 'display_name' in volume:
-            self.assertEqual(name, volume['display_name'])
-        else:
-            self.assertEqual(name, volume['name'])
-        waiters.wait_for_volume_resource_status(self.volumes_client,
-                                                volume['id'], 'available')
-        # The volume retrieved on creation has a non-up-to-date status.
-        # Retrieval after it becomes active ensures correct details.
-        volume = self.volumes_client.show_volume(volume['id'])['volume']
-        return volume
-
-    def create_volume_type(self, client=None, name=None, backend_name=None):
-        if not client:
-            client = self.admin_volume_types_client
-        if not name:
-            class_name = self.__class__.__name__
-            name = data_utils.rand_name(class_name + '-volume-type')
-        randomized_name = data_utils.rand_name('scenario-type-' + name)
-
-        LOG.debug("Creating a volume type: %s on backend %s",
-                  randomized_name, backend_name)
-        extra_specs = {}
-        if backend_name:
-            extra_specs = {"volume_backend_name": backend_name}
-
-        body = client.create_volume_type(name=randomized_name,
-                                         extra_specs=extra_specs)
-        volume_type = body['volume_type']
-        self.assertIn('id', volume_type)
-        self.addCleanup(client.delete_volume_type, volume_type['id'])
-        return volume_type
-
-    def _create_loginable_secgroup_rule(self, secgroup_id=None):
-        _client = self.compute_security_groups_client
-        _client_rules = self.compute_security_group_rules_client
-        if secgroup_id is None:
-            sgs = _client.list_security_groups()['security_groups']
-            for sg in sgs:
-                if sg['name'] == 'default':
-                    secgroup_id = sg['id']
-
-        # These rules are intended to permit inbound ssh and icmp
-        # traffic from all sources, so no group_id is provided.
-        # Setting a group_id would only permit traffic from ports
-        # belonging to the same security group.
-        rulesets = [
-            {
-                # ssh
-                'ip_protocol': 'tcp',
-                'from_port': 22,
-                'to_port': 22,
-                'cidr': '0.0.0.0/0',
-            },
-            {
-                # ping
-                'ip_protocol': 'icmp',
-                'from_port': -1,
-                'to_port': -1,
-                'cidr': '0.0.0.0/0',
-            }
-        ]
-        rules = list()
-        for ruleset in rulesets:
-            sg_rule = _client_rules.create_security_group_rule(
-                parent_group_id=secgroup_id, **ruleset)['security_group_rule']
-            rules.append(sg_rule)
-        return rules
-
-    def _create_security_group(self):
-        # Create security group
-        sg_name = data_utils.rand_name(self.__class__.__name__)
-        sg_desc = sg_name + " description"
-        secgroup = self.compute_security_groups_client.create_security_group(
-            name=sg_name, description=sg_desc)['security_group']
-        self.assertEqual(secgroup['name'], sg_name)
-        self.assertEqual(secgroup['description'], sg_desc)
-        self.addCleanup(
-            test_utils.call_and_ignore_notfound_exc,
-            self.compute_security_groups_client.delete_security_group,
-            secgroup['id'])
-
-        # Add rules to the security group
-        self._create_loginable_secgroup_rule(secgroup['id'])
-
-        return secgroup
-
     def get_remote_client(self, ip_address, username=None, private_key=None):
         """Get a SSH client to a remote server
 
@@ -367,65 +255,6 @@
 
         return linux_client
 
-    def _image_create(self, name, fmt, path,
-                      disk_format=None, properties=None):
-        if properties is None:
-            properties = {}
-        name = data_utils.rand_name('%s-' % name)
-        params = {
-            'name': name,
-            'container_format': fmt,
-            'disk_format': disk_format or fmt,
-        }
-        if CONF.image_feature_enabled.api_v1:
-            params['is_public'] = 'False'
-            params['properties'] = properties
-            params = {'headers': common_image.image_meta_to_headers(**params)}
-        else:
-            params['visibility'] = 'private'
-            # Additional properties are flattened out in the v2 API.
-            params.update(properties)
-        body = self.image_client.create_image(**params)
-        image = body['image'] if 'image' in body else body
-        self.addCleanup(self.image_client.delete_image, image['id'])
-        self.assertEqual("queued", image['status'])
-        with open(path, 'rb') as image_file:
-            if CONF.image_feature_enabled.api_v1:
-                self.image_client.update_image(image['id'], data=image_file)
-            else:
-                self.image_client.store_image_file(image['id'], image_file)
-        return image['id']
-
-    def glance_image_create(self):
-        img_path = CONF.scenario.img_dir + "/" + CONF.scenario.img_file
-        aki_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.aki_img_file
-        ari_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ari_img_file
-        ami_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ami_img_file
-        img_container_format = CONF.scenario.img_container_format
-        img_disk_format = CONF.scenario.img_disk_format
-        img_properties = CONF.scenario.img_properties
-        LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
-                  "properties: %s, ami: %s, ari: %s, aki: %s",
-                  img_path, img_container_format, img_disk_format,
-                  img_properties, ami_img_path, ari_img_path, aki_img_path)
-        try:
-            image = self._image_create('scenario-img',
-                                       img_container_format,
-                                       img_path,
-                                       disk_format=img_disk_format,
-                                       properties=img_properties)
-        except IOError:
-            LOG.debug("A qcow2 image was not found. Try to get a uec image.")
-            kernel = self._image_create('scenario-aki', 'aki', aki_img_path)
-            ramdisk = self._image_create('scenario-ari', 'ari', ari_img_path)
-            properties = {'kernel_id': kernel, 'ramdisk_id': ramdisk}
-            image = self._image_create('scenario-ami', 'ami',
-                                       path=ami_img_path,
-                                       properties=properties)
-        LOG.debug("image:%s", image)
-
-        return image
-
     def _log_console_output(self, servers=None):
         if not CONF.compute_feature_enabled.console_output:
             LOG.debug('Console output not supported, cannot log')
@@ -443,77 +272,6 @@
                 LOG.debug("Server %s disappeared(deleted) while looking "
                           "for the console log", server['id'])
 
-    def _log_net_info(self, exc):
-        # network debug is called as part of ssh init
-        if not isinstance(exc, lib_exc.SSHTimeout):
-            LOG.debug('Network information on a devstack host')
-
-    def create_server_snapshot(self, server, name=None):
-        # Glance client
-        _image_client = self.image_client
-        # Compute client
-        _images_client = self.compute_images_client
-        if name is None:
-            name = data_utils.rand_name(self.__class__.__name__ + 'snapshot')
-        LOG.debug("Creating a snapshot image for server: %s", server['name'])
-        image = _images_client.create_image(server['id'], name=name)
-        image_id = image.response['location'].split('images/')[1]
-        waiters.wait_for_image_status(_image_client, image_id, 'active')
-
-        self.addCleanup(_image_client.wait_for_resource_deletion,
-                        image_id)
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        _image_client.delete_image, image_id)
-
-        if CONF.image_feature_enabled.api_v1:
-            # In glance v1 the additional properties are stored in the headers.
-            resp = _image_client.check_image(image_id)
-            snapshot_image = common_image.get_image_meta_from_headers(resp)
-            image_props = snapshot_image.get('properties', {})
-        else:
-            # In glance v2 the additional properties are flattened.
-            snapshot_image = _image_client.show_image(image_id)
-            image_props = snapshot_image
-
-        bdm = image_props.get('block_device_mapping')
-        if bdm:
-            bdm = json.loads(bdm)
-            if bdm and 'snapshot_id' in bdm[0]:
-                snapshot_id = bdm[0]['snapshot_id']
-                self.addCleanup(
-                    self.snapshots_client.wait_for_resource_deletion,
-                    snapshot_id)
-                self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                                self.snapshots_client.delete_snapshot,
-                                snapshot_id)
-                waiters.wait_for_volume_resource_status(self.snapshots_client,
-                                                        snapshot_id,
-                                                        'available')
-        image_name = snapshot_image['name']
-        self.assertEqual(name, image_name)
-        LOG.debug("Created snapshot image %s for server %s",
-                  image_name, server['name'])
-        return snapshot_image
-
-    def nova_volume_attach(self, server, volume_to_attach):
-        volume = self.servers_client.attach_volume(
-            server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
-            % CONF.compute.volume_device_name)['volumeAttachment']
-        self.assertEqual(volume_to_attach['id'], volume['id'])
-        waiters.wait_for_volume_resource_status(self.volumes_client,
-                                                volume['id'], 'in-use')
-
-        # Return the updated volume after the attachment
-        return self.volumes_client.show_volume(volume['id'])['volume']
-
-    def nova_volume_detach(self, server, volume):
-        self.servers_client.detach_volume(server['id'], volume['id'])
-        waiters.wait_for_volume_resource_status(self.volumes_client,
-                                                volume['id'], 'available')
-
-        volume = self.volumes_client.show_volume(volume['id'])['volume']
-        self.assertEqual('available', volume['status'])
-
     def rebuild_server(self, server_id, image=None,
                        preserve_ephemeral=False, wait=True,
                        rebuild_kwargs=None):
@@ -599,27 +357,6 @@
             # no need to check ssh for negative connectivity
             self.get_remote_client(ip_address, username, private_key)
 
-    def check_public_network_connectivity(self, ip_address, username,
-                                          private_key, should_connect=True,
-                                          msg=None, servers=None, mtu=None):
-        # The target login is assumed to have been configured for
-        # key-based authentication by cloud-init.
-        LOG.debug('checking network connections to IP %s with user: %s',
-                  ip_address, username)
-        try:
-            self.check_vm_connectivity(ip_address,
-                                       username,
-                                       private_key,
-                                       should_connect=should_connect,
-                                       mtu=mtu)
-        except Exception:
-            ex_msg = 'Public network connectivity check failed'
-            if msg:
-                ex_msg += ": " + msg
-            LOG.exception(ex_msg)
-            self._log_console_output(servers)
-            raise
-
     def create_floating_ip(self, thing, pool_name=None):
         """Create a floating IP and associates to a server on Nova"""
 
@@ -650,18 +387,6 @@
             ssh_client.exec_command('sudo umount %s' % mount_path)
         return timestamp
 
-    def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
-                      private_key=None):
-        ssh_client = self.get_remote_client(ip_address,
-                                            private_key=private_key)
-        if dev_name is not None:
-            ssh_client.mount(dev_name, mount_path)
-        timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
-                                            % mount_path)
-        if dev_name is not None:
-            ssh_client.exec_command('sudo umount %s' % mount_path)
-        return timestamp
-
     def get_server_ip(self, server):
         """Get the server fixed or floating IP.
 
@@ -737,72 +462,6 @@
                         network['id'])
         return network
 
-    def _create_subnet(self, network, subnets_client=None,
-                       routers_client=None, namestart='subnet-smoke',
-                       **kwargs):
-        """Create a subnet for the given network
-
-        within the cidr block configured for tenant networks.
-        """
-        if not subnets_client:
-            subnets_client = self.subnets_client
-        if not routers_client:
-            routers_client = self.routers_client
-
-        def cidr_in_use(cidr, tenant_id):
-            """Check cidr existence
-
-            :returns: True if subnet with cidr already exist in tenant
-                  False else
-            """
-            cidr_in_use = self.admin_manager.subnets_client.list_subnets(
-                tenant_id=tenant_id, cidr=cidr)['subnets']
-            return len(cidr_in_use) != 0
-
-        ip_version = kwargs.pop('ip_version', 4)
-
-        if ip_version == 6:
-            tenant_cidr = netaddr.IPNetwork(
-                CONF.network.project_network_v6_cidr)
-            num_bits = CONF.network.project_network_v6_mask_bits
-        else:
-            tenant_cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
-            num_bits = CONF.network.project_network_mask_bits
-
-        result = None
-        str_cidr = None
-        # Repeatedly attempt subnet creation with sequential cidr
-        # blocks until an unallocated block is found.
-        for subnet_cidr in tenant_cidr.subnet(num_bits):
-            str_cidr = str(subnet_cidr)
-            if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
-                continue
-
-            subnet = dict(
-                name=data_utils.rand_name(namestart),
-                network_id=network['id'],
-                tenant_id=network['tenant_id'],
-                cidr=str_cidr,
-                ip_version=ip_version,
-                **kwargs
-            )
-            try:
-                result = subnets_client.create_subnet(**subnet)
-                break
-            except lib_exc.Conflict as e:
-                is_overlapping_cidr = 'overlaps with another subnet' in str(e)
-                if not is_overlapping_cidr:
-                    raise
-        self.assertIsNotNone(result, 'Unable to allocate tenant network')
-
-        subnet = result['subnet']
-        self.assertEqual(subnet['cidr'], str_cidr)
-
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        subnets_client.delete_subnet, subnet['id'])
-
-        return subnet
-
     def _get_server_port_id_and_ip4(self, server, ip_addr=None):
         ports = self.admin_manager.ports_client.list_ports(
             device_id=server['id'], fixed_ip=ip_addr)['ports']
@@ -833,13 +492,6 @@
                          % port_map)
         return port_map[0]
 
-    def _get_network_by_name(self, network_name):
-        net = self.admin_manager.networks_client.list_networks(
-            name=network_name)['networks']
-        self.assertNotEqual(len(net), 0,
-                            "Unable to get network by name: %s" % network_name)
-        return net[0]
-
     def create_floating_ip(self, thing, external_network_id=None,
                            port_id=None, client=None):
         """Create a floating IP and associates to a resource/port on Neutron"""
@@ -862,515 +514,3 @@
                         client.delete_floatingip,
                         floating_ip['id'])
         return floating_ip
-
-    def _associate_floating_ip(self, floating_ip, server):
-        port_id, _ = self._get_server_port_id_and_ip4(server)
-        kwargs = dict(port_id=port_id)
-        floating_ip = self.floating_ips_client.update_floatingip(
-            floating_ip['id'], **kwargs)['floatingip']
-        self.assertEqual(port_id, floating_ip['port_id'])
-        return floating_ip
-
-    def _disassociate_floating_ip(self, floating_ip):
-        """:param floating_ip: floating_ips_client.create_floatingip"""
-        kwargs = dict(port_id=None)
-        floating_ip = self.floating_ips_client.update_floatingip(
-            floating_ip['id'], **kwargs)['floatingip']
-        self.assertIsNone(floating_ip['port_id'])
-        return floating_ip
-
-    def check_floating_ip_status(self, floating_ip, status):
-        """Verifies floatingip reaches the given status
-
-        :param dict floating_ip: floating IP dict to check status
-        :param status: target status
-        :raises: AssertionError if status doesn't match
-        """
-        floatingip_id = floating_ip['id']
-
-        def refresh():
-            result = (self.floating_ips_client.
-                      show_floatingip(floatingip_id)['floatingip'])
-            return status == result['status']
-
-        test_utils.call_until_true(refresh,
-                                   CONF.network.build_timeout,
-                                   CONF.network.build_interval)
-        floating_ip = self.floating_ips_client.show_floatingip(
-            floatingip_id)['floatingip']
-        self.assertEqual(status, floating_ip['status'],
-                         message="FloatingIP: {fp} is at status: {cst}. "
-                                 "failed  to reach status: {st}"
-                         .format(fp=floating_ip, cst=floating_ip['status'],
-                                 st=status))
-        LOG.info("FloatingIP: {fp} is at status: {st}"
-                 .format(fp=floating_ip, st=status))
-
-    def _check_tenant_network_connectivity(self, server,
-                                           username,
-                                           private_key,
-                                           should_connect=True,
-                                           servers_for_debug=None):
-        if not CONF.network.project_networks_reachable:
-            msg = 'Tenant networks not configured to be reachable.'
-            LOG.info(msg)
-            return
-        # The target login is assumed to have been configured for
-        # key-based authentication by cloud-init.
-        try:
-            for ip_addresses in server['addresses'].values():
-                for ip_address in ip_addresses:
-                    self.check_vm_connectivity(ip_address['addr'],
-                                               username,
-                                               private_key,
-                                               should_connect=should_connect)
-        except Exception as e:
-            LOG.exception('Tenant network connectivity check failed')
-            self._log_console_output(servers_for_debug)
-            self._log_net_info(e)
-            raise
-
-    def _check_remote_connectivity(self, source, dest, should_succeed=True,
-                                   nic=None):
-        """assert ping server via source ssh connection
-
-        Note: This is an internal method.  Use check_remote_connectivity
-        instead.
-
-        :param source: RemoteClient: an ssh connection from which to ping
-        :param dest: and IP to ping against
-        :param should_succeed: boolean should ping succeed or not
-        :param nic: specific network interface to ping from
-        """
-        def ping_remote():
-            try:
-                source.ping_host(dest, nic=nic)
-            except lib_exc.SSHExecCommandFailed:
-                LOG.warning('Failed to ping IP: %s via a ssh connection '
-                            'from: %s.', dest, source.ssh_client.host)
-                return not should_succeed
-            return should_succeed
-
-        return test_utils.call_until_true(ping_remote,
-                                          CONF.validation.ping_timeout,
-                                          1)
-
-    def check_remote_connectivity(self, source, dest, should_succeed=True,
-                                  nic=None):
-        """assert ping server via source ssh connection
-
-        :param source: RemoteClient: an ssh connection from which to ping
-        :param dest: and IP to ping against
-        :param should_succeed: boolean should ping succeed or not
-        :param nic: specific network interface to ping from
-        """
-        result = self._check_remote_connectivity(source, dest, should_succeed,
-                                                 nic)
-        source_host = source.ssh_client.host
-        if should_succeed:
-            msg = "Timed out waiting for %s to become reachable from %s" \
-                % (dest, source_host)
-        else:
-            msg = "%s is reachable from %s" % (dest, source_host)
-        self.assertTrue(result, msg)
-
-    def _create_security_group(self, security_group_rules_client=None,
-                               tenant_id=None,
-                               namestart='secgroup-smoke',
-                               security_groups_client=None):
-        if security_group_rules_client is None:
-            security_group_rules_client = self.security_group_rules_client
-        if security_groups_client is None:
-            security_groups_client = self.security_groups_client
-        if tenant_id is None:
-            tenant_id = security_groups_client.tenant_id
-        secgroup = self._create_empty_security_group(
-            namestart=namestart, client=security_groups_client,
-            tenant_id=tenant_id)
-
-        # Add rules to the security group
-        rules = self._create_loginable_secgroup_rule(
-            security_group_rules_client=security_group_rules_client,
-            secgroup=secgroup,
-            security_groups_client=security_groups_client)
-        for rule in rules:
-            self.assertEqual(tenant_id, rule['tenant_id'])
-            self.assertEqual(secgroup['id'], rule['security_group_id'])
-        return secgroup
-
-    def _create_empty_security_group(self, client=None, tenant_id=None,
-                                     namestart='secgroup-smoke'):
-        """Create a security group without rules.
-
-        Default rules will be created:
-         - IPv4 egress to any
-         - IPv6 egress to any
-
-        :param tenant_id: secgroup will be created in this tenant
-        :returns: the created security group
-        """
-        if client is None:
-            client = self.security_groups_client
-        if not tenant_id:
-            tenant_id = client.tenant_id
-        sg_name = data_utils.rand_name(namestart)
-        sg_desc = sg_name + " description"
-        sg_dict = dict(name=sg_name,
-                       description=sg_desc)
-        sg_dict['tenant_id'] = tenant_id
-        result = client.create_security_group(**sg_dict)
-
-        secgroup = result['security_group']
-        self.assertEqual(secgroup['name'], sg_name)
-        self.assertEqual(tenant_id, secgroup['tenant_id'])
-        self.assertEqual(secgroup['description'], sg_desc)
-
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        client.delete_security_group, secgroup['id'])
-        return secgroup
-
-    def _default_security_group(self, client=None, tenant_id=None):
-        """Get default secgroup for given tenant_id.
-
-        :returns: default secgroup for given tenant
-        """
-        if client is None:
-            client = self.security_groups_client
-        if not tenant_id:
-            tenant_id = client.tenant_id
-        sgs = [
-            sg for sg in list(client.list_security_groups().values())[0]
-            if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
-        ]
-        msg = "No default security group for tenant %s." % (tenant_id)
-        self.assertGreater(len(sgs), 0, msg)
-        return sgs[0]
-
-    def _create_security_group_rule(self, secgroup=None,
-                                    sec_group_rules_client=None,
-                                    tenant_id=None,
-                                    security_groups_client=None, **kwargs):
-        """Create a rule from a dictionary of rule parameters.
-
-        Create a rule in a secgroup. if secgroup not defined will search for
-        default secgroup in tenant_id.
-
-        :param secgroup: the security group.
-        :param tenant_id: if secgroup not passed -- the tenant in which to
-            search for default secgroup
-        :param kwargs: a dictionary containing rule parameters:
-            for example, to allow incoming ssh:
-            rule = {
-                    direction: 'ingress'
-                    protocol:'tcp',
-                    port_range_min: 22,
-                    port_range_max: 22
-                    }
-        """
-        if sec_group_rules_client is None:
-            sec_group_rules_client = self.security_group_rules_client
-        if security_groups_client is None:
-            security_groups_client = self.security_groups_client
-        if not tenant_id:
-            tenant_id = security_groups_client.tenant_id
-        if secgroup is None:
-            secgroup = self._default_security_group(
-                client=security_groups_client, tenant_id=tenant_id)
-
-        ruleset = dict(security_group_id=secgroup['id'],
-                       tenant_id=secgroup['tenant_id'])
-        ruleset.update(kwargs)
-
-        sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
-        sg_rule = sg_rule['security_group_rule']
-
-        self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
-        self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
-
-        return sg_rule
-
-    def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
-                                        secgroup=None,
-                                        security_groups_client=None):
-        """Create loginable security group rule
-
-        This function will create:
-        1. egress and ingress tcp port 22 allow rule in order to allow ssh
-        access for ipv4.
-        2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
-        3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
-        """
-
-        if security_group_rules_client is None:
-            security_group_rules_client = self.security_group_rules_client
-        if security_groups_client is None:
-            security_groups_client = self.security_groups_client
-        rules = []
-        rulesets = [
-            dict(
-                # ssh
-                protocol='tcp',
-                port_range_min=22,
-                port_range_max=22,
-            ),
-            dict(
-                # ping
-                protocol='icmp',
-            ),
-            dict(
-                # ipv6-icmp for ping6
-                protocol='icmp',
-                ethertype='IPv6',
-            )
-        ]
-        sec_group_rules_client = security_group_rules_client
-        for ruleset in rulesets:
-            for r_direction in ['ingress', 'egress']:
-                ruleset['direction'] = r_direction
-                try:
-                    sg_rule = self._create_security_group_rule(
-                        sec_group_rules_client=sec_group_rules_client,
-                        secgroup=secgroup,
-                        security_groups_client=security_groups_client,
-                        **ruleset)
-                except lib_exc.Conflict as ex:
-                    # if rule already exist - skip rule and continue
-                    msg = 'Security group rule already exists'
-                    if msg not in ex._error_string:
-                        raise ex
-                else:
-                    self.assertEqual(r_direction, sg_rule['direction'])
-                    rules.append(sg_rule)
-
-        return rules
-
-    def _get_router(self, client=None, tenant_id=None):
-        """Retrieve a router for the given tenant id.
-
-        If a public router has been configured, it will be returned.
-
-        If a public router has not been configured, but a public
-        network has, a tenant router will be created and returned that
-        routes traffic to the public network.
-        """
-        if not client:
-            client = self.routers_client
-        if not tenant_id:
-            tenant_id = client.tenant_id
-        router_id = CONF.network.public_router_id
-        network_id = CONF.network.public_network_id
-        if router_id:
-            body = client.show_router(router_id)
-            return body['router']
-        elif network_id:
-            router = self._create_router(client, tenant_id)
-            kwargs = {'external_gateway_info': dict(network_id=network_id)}
-            router = client.update_router(router['id'], **kwargs)['router']
-            return router
-        else:
-            raise Exception("Neither of 'public_router_id' or "
-                            "'public_network_id' has been defined.")
-
-    def _create_router(self, client=None, tenant_id=None,
-                       namestart='router-smoke'):
-        if not client:
-            client = self.routers_client
-        if not tenant_id:
-            tenant_id = client.tenant_id
-        name = data_utils.rand_name(namestart)
-        result = client.create_router(name=name,
-                                      admin_state_up=True,
-                                      tenant_id=tenant_id)
-        router = result['router']
-        self.assertEqual(router['name'], name)
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        client.delete_router,
-                        router['id'])
-        return router
-
-    def _update_router_admin_state(self, router, admin_state_up):
-        kwargs = dict(admin_state_up=admin_state_up)
-        router = self.routers_client.update_router(
-            router['id'], **kwargs)['router']
-        self.assertEqual(admin_state_up, router['admin_state_up'])
-
-    def create_networks(self, networks_client=None,
-                        routers_client=None, subnets_client=None,
-                        tenant_id=None, dns_nameservers=None,
-                        port_security_enabled=True):
-        """Create a network with a subnet connected to a router.
-
-        The baremetal driver is a special case since all nodes are
-        on the same shared network.
-
-        :param tenant_id: id of tenant to create resources in.
-        :param dns_nameservers: list of dns servers to send to subnet.
-        :returns: network, subnet, router
-        """
-        if CONF.network.shared_physical_network:
-            # NOTE(Shrews): This exception is for environments where tenant
-            # credential isolation is available, but network separation is
-            # not (the current baremetal case). Likely can be removed when
-            # test account mgmt is reworked:
-            # https://blueprints.launchpad.net/tempest/+spec/test-accounts
-            if not CONF.compute.fixed_network_name:
-                m = 'fixed_network_name must be specified in config'
-                raise lib_exc.InvalidConfiguration(m)
-            network = self._get_network_by_name(
-                CONF.compute.fixed_network_name)
-            router = None
-            subnet = None
-        else:
-            network = self._create_network(
-                networks_client=networks_client,
-                tenant_id=tenant_id,
-                port_security_enabled=port_security_enabled)
-            router = self._get_router(client=routers_client,
-                                      tenant_id=tenant_id)
-            subnet_kwargs = dict(network=network,
-                                 subnets_client=subnets_client,
-                                 routers_client=routers_client)
-            # use explicit check because empty list is a valid option
-            if dns_nameservers is not None:
-                subnet_kwargs['dns_nameservers'] = dns_nameservers
-            subnet = self._create_subnet(**subnet_kwargs)
-            if not routers_client:
-                routers_client = self.routers_client
-            router_id = router['id']
-            routers_client.add_router_interface(router_id,
-                                                subnet_id=subnet['id'])
-
-            # save a cleanup job to remove this association between
-            # router and subnet
-            self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                            routers_client.remove_router_interface, router_id,
-                            subnet_id=subnet['id'])
-        return network, subnet, router
-
-
-class EncryptionScenarioTest(ScenarioTest):
-    """Base class for encryption scenario tests"""
-
-    credentials = ['primary', 'admin']
-
-    @classmethod
-    def setup_clients(cls):
-        super(EncryptionScenarioTest, cls).setup_clients()
-        if CONF.volume_feature_enabled.api_v2:
-            cls.admin_volume_types_client = cls.os_adm.volume_types_v2_client
-            cls.admin_encryption_types_client =\
-                cls.os_adm.encryption_types_v2_client
-        else:
-            cls.admin_volume_types_client = cls.os_adm.volume_types_client
-            cls.admin_encryption_types_client =\
-                cls.os_adm.encryption_types_client
-
-    def create_encryption_type(self, client=None, type_id=None, provider=None,
-                               key_size=None, cipher=None,
-                               control_location=None):
-        if not client:
-            client = self.admin_encryption_types_client
-        if not type_id:
-            volume_type = self.create_volume_type()
-            type_id = volume_type['id']
-        LOG.debug("Creating an encryption type for volume type: %s", type_id)
-        client.create_encryption_type(
-            type_id, provider=provider, key_size=key_size, cipher=cipher,
-            control_location=control_location)['encryption']
-
-
-class ObjectStorageScenarioTest(ScenarioTest):
-    """Provide harness to do Object Storage scenario tests.
-
-    Subclasses implement the tests that use the methods provided by this
-    class.
-    """
-
-    @classmethod
-    def skip_checks(cls):
-        super(ObjectStorageScenarioTest, cls).skip_checks()
-        if not CONF.service_available.swift:
-            skip_msg = ("%s skipped as swift is not available" %
-                        cls.__name__)
-            raise cls.skipException(skip_msg)
-
-    @classmethod
-    def setup_credentials(cls):
-        cls.set_network_resources()
-        super(ObjectStorageScenarioTest, cls).setup_credentials()
-        operator_role = CONF.object_storage.operator_role
-        cls.os_operator = cls.get_client_manager(roles=[operator_role])
-
-    @classmethod
-    def setup_clients(cls):
-        super(ObjectStorageScenarioTest, cls).setup_clients()
-        # Clients for Swift
-        cls.account_client = cls.os_operator.account_client
-        cls.container_client = cls.os_operator.container_client
-        cls.object_client = cls.os_operator.object_client
-
-    def get_swift_stat(self):
-        """get swift status for our user account."""
-        self.account_client.list_account_containers()
-        LOG.debug('Swift status information obtained successfully')
-
-    def create_container(self, container_name=None):
-        name = container_name or data_utils.rand_name(
-            'swift-scenario-container')
-        self.container_client.create_container(name)
-        # look for the container to assure it is created
-        self.list_and_check_container_objects(name)
-        LOG.debug('Container %s created', name)
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        self.container_client.delete_container,
-                        name)
-        return name
-
-    def delete_container(self, container_name):
-        self.container_client.delete_container(container_name)
-        LOG.debug('Container %s deleted', container_name)
-
-    def upload_object_to_container(self, container_name, obj_name=None):
-        obj_name = obj_name or data_utils.rand_name('swift-scenario-object')
-        obj_data = data_utils.random_bytes()
-        self.object_client.create_object(container_name, obj_name, obj_data)
-        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
-                        self.object_client.delete_object,
-                        container_name,
-                        obj_name)
-        return obj_name, obj_data
-
-    def delete_object(self, container_name, filename):
-        self.object_client.delete_object(container_name, filename)
-        self.list_and_check_container_objects(container_name,
-                                              not_present_obj=[filename])
-
-    def list_and_check_container_objects(self, container_name,
-                                         present_obj=None,
-                                         not_present_obj=None):
-        # List objects for a given container and assert which are present and
-        # which are not.
-        if present_obj is None:
-            present_obj = []
-        if not_present_obj is None:
-            not_present_obj = []
-        _, object_list = self.container_client.list_container_contents(
-            container_name)
-        if present_obj:
-            for obj in present_obj:
-                self.assertIn(obj, object_list)
-        if not_present_obj:
-            for obj in not_present_obj:
-                self.assertNotIn(obj, object_list)
-
-    def change_container_acl(self, container_name, acl):
-        metadata_param = {'metadata_prefix': 'x-container-',
-                          'metadata': {'read': acl}}
-        self.container_client.update_container_metadata(container_name,
-                                                        **metadata_param)
-        resp, _ = self.container_client.list_container_metadata(container_name)
-        self.assertEqual(resp['x-container-read'], acl)
-
-    def download_and_verify(self, container_name, obj_name, expected_data):
-        _, obj = self.object_client.get_object(container_name, obj_name)
-        self.assertEqual(obj, expected_data)