Refactored Neutron tempest plugin directory structure

* switch from neutron.tests.tempest to neutron_tempest_plugin
* Cleaned up README.rst and setup.cfg
* Use neutron_tempest_plugin as a tempest plugin package
* Fixed gitreview
* Keeping flake8 Ignores in tox.ini as tempest plugin is
  imported from neutron codebase.

Change-Id: I42d389836e72813fdeebc797a577f4a8ac2ee603
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
new file mode 100644
index 0000000..b122ce8
--- /dev/null
+++ b/neutron_tempest_plugin/api/base.py
@@ -0,0 +1,809 @@
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import functools
+import math
+
+import netaddr
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron_tempest_plugin.api import clients
+from neutron_tempest_plugin.common import constants
+from neutron_tempest_plugin.common import utils
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin import exceptions
+
+CONF = config.CONF
+
+
+class BaseNetworkTest(test.BaseTestCase):
+
+    """
+    Base class for the Neutron tests that use the Tempest Neutron REST client
+
+    Per the Neutron API Guide, API v1.x was removed from the source code tree
+    (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
+    Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
+    following options are defined in the [network] section of etc/tempest.conf:
+
+        project_network_cidr with a block of cidr's from which smaller blocks
+        can be allocated for tenant networks
+
+        project_network_mask_bits with the mask bits to be used to partition
+        the block defined by tenant-network_cidr
+
+    Finally, it is assumed that the following option is defined in the
+    [service_available] section of etc/tempest.conf
+
+        neutron as True
+    """
+
+    force_tenant_isolation = False
+    credentials = ['primary']
+
+    # Default to ipv4.
+    _ip_version = 4
+
+    @classmethod
+    def get_client_manager(cls, credential_type=None, roles=None,
+                           force_new=None):
+        manager = super(BaseNetworkTest, cls).get_client_manager(
+            credential_type=credential_type,
+            roles=roles,
+            force_new=force_new
+        )
+        # Neutron uses a different clients manager than the one in the Tempest
+        return clients.Manager(manager.credentials)
+
+    @classmethod
+    def skip_checks(cls):
+        super(BaseNetworkTest, cls).skip_checks()
+        if not CONF.service_available.neutron:
+            raise cls.skipException("Neutron support is required")
+        if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
+            raise cls.skipException("IPv6 Tests are disabled.")
+        for req_ext in getattr(cls, 'required_extensions', []):
+            if not test.is_extension_enabled(req_ext, 'network'):
+                msg = "%s extension not enabled." % req_ext
+                raise cls.skipException(msg)
+
+    @classmethod
+    def setup_credentials(cls):
+        # Create no network resources for these test.
+        cls.set_network_resources()
+        super(BaseNetworkTest, cls).setup_credentials()
+
+    @classmethod
+    def setup_clients(cls):
+        super(BaseNetworkTest, cls).setup_clients()
+        cls.client = cls.os_primary.network_client
+
+    @classmethod
+    def resource_setup(cls):
+        super(BaseNetworkTest, cls).resource_setup()
+
+        cls.networks = []
+        cls.admin_networks = []
+        cls.subnets = []
+        cls.admin_subnets = []
+        cls.ports = []
+        cls.routers = []
+        cls.floating_ips = []
+        cls.metering_labels = []
+        cls.service_profiles = []
+        cls.flavors = []
+        cls.metering_label_rules = []
+        cls.qos_rules = []
+        cls.qos_policies = []
+        cls.ethertype = "IPv" + str(cls._ip_version)
+        cls.address_scopes = []
+        cls.admin_address_scopes = []
+        cls.subnetpools = []
+        cls.admin_subnetpools = []
+        cls.security_groups = []
+
+    @classmethod
+    def resource_cleanup(cls):
+        if CONF.service_available.neutron:
+            # Clean up floating IPs
+            for floating_ip in cls.floating_ips:
+                cls._try_delete_resource(cls.client.delete_floatingip,
+                                         floating_ip['id'])
+            # Clean up routers
+            for router in cls.routers:
+                cls._try_delete_resource(cls.delete_router,
+                                         router)
+            # Clean up metering label rules
+            for metering_label_rule in cls.metering_label_rules:
+                cls._try_delete_resource(
+                    cls.admin_client.delete_metering_label_rule,
+                    metering_label_rule['id'])
+            # Clean up metering labels
+            for metering_label in cls.metering_labels:
+                cls._try_delete_resource(
+                    cls.admin_client.delete_metering_label,
+                    metering_label['id'])
+            # Clean up flavors
+            for flavor in cls.flavors:
+                cls._try_delete_resource(
+                    cls.admin_client.delete_flavor,
+                    flavor['id'])
+            # Clean up service profiles
+            for service_profile in cls.service_profiles:
+                cls._try_delete_resource(
+                    cls.admin_client.delete_service_profile,
+                    service_profile['id'])
+            # Clean up ports
+            for port in cls.ports:
+                cls._try_delete_resource(cls.client.delete_port,
+                                         port['id'])
+            # Clean up subnets
+            for subnet in cls.subnets:
+                cls._try_delete_resource(cls.client.delete_subnet,
+                                         subnet['id'])
+            # Clean up admin subnets
+            for subnet in cls.admin_subnets:
+                cls._try_delete_resource(cls.admin_client.delete_subnet,
+                                         subnet['id'])
+            # Clean up networks
+            for network in cls.networks:
+                cls._try_delete_resource(cls.client.delete_network,
+                                         network['id'])
+
+            # Clean up admin networks
+            for network in cls.admin_networks:
+                cls._try_delete_resource(cls.admin_client.delete_network,
+                                         network['id'])
+
+            # Clean up security groups
+            for secgroup in cls.security_groups:
+                cls._try_delete_resource(cls.client.delete_security_group,
+                                         secgroup['id'])
+
+            for subnetpool in cls.subnetpools:
+                cls._try_delete_resource(cls.client.delete_subnetpool,
+                                         subnetpool['id'])
+
+            for subnetpool in cls.admin_subnetpools:
+                cls._try_delete_resource(cls.admin_client.delete_subnetpool,
+                                         subnetpool['id'])
+
+            for address_scope in cls.address_scopes:
+                cls._try_delete_resource(cls.client.delete_address_scope,
+                                         address_scope['id'])
+
+            for address_scope in cls.admin_address_scopes:
+                cls._try_delete_resource(
+                    cls.admin_client.delete_address_scope,
+                    address_scope['id'])
+
+            # Clean up QoS rules
+            for qos_rule in cls.qos_rules:
+                cls._try_delete_resource(cls.admin_client.delete_qos_rule,
+                                         qos_rule['id'])
+            # Clean up QoS policies
+            # as all networks and ports are already removed, QoS policies
+            # shouldn't be "in use"
+            for qos_policy in cls.qos_policies:
+                cls._try_delete_resource(cls.admin_client.delete_qos_policy,
+                                         qos_policy['id'])
+
+        super(BaseNetworkTest, cls).resource_cleanup()
+
+    @classmethod
+    def _try_delete_resource(cls, delete_callable, *args, **kwargs):
+        """Cleanup resources in case of test-failure
+
+        Some resources are explicitly deleted by the test.
+        If the test failed to delete a resource, this method will execute
+        the appropriate delete methods. Otherwise, the method ignores NotFound
+        exceptions thrown for resources that were correctly deleted by the
+        test.
+
+        :param delete_callable: delete method
+        :param args: arguments for delete method
+        :param kwargs: keyword arguments for delete method
+        """
+        try:
+            delete_callable(*args, **kwargs)
+        # if resource is not found, this means it was deleted in the test
+        except lib_exc.NotFound:
+            pass
+
+    @classmethod
+    def create_network(cls, network_name=None, client=None, **kwargs):
+        """Wrapper utility that returns a test network."""
+        network_name = network_name or data_utils.rand_name('test-network-')
+
+        client = client or cls.client
+        body = client.create_network(name=network_name, **kwargs)
+        network = body['network']
+        if client is cls.client:
+            cls.networks.append(network)
+        else:
+            cls.admin_networks.append(network)
+        return network
+
+    @classmethod
+    def create_shared_network(cls, network_name=None, **post_body):
+        network_name = network_name or data_utils.rand_name('sharednetwork-')
+        post_body.update({'name': network_name, 'shared': True})
+        body = cls.admin_client.create_network(**post_body)
+        network = body['network']
+        cls.admin_networks.append(network)
+        return network
+
+    @classmethod
+    def create_network_keystone_v3(cls, network_name=None, project_id=None,
+                                   tenant_id=None, client=None):
+        """Wrapper utility that creates a test network with project_id."""
+        client = client or cls.client
+        network_name = network_name or data_utils.rand_name(
+            'test-network-with-project_id')
+        project_id = cls.client.tenant_id
+        body = client.create_network_keystone_v3(network_name, project_id,
+            tenant_id)
+        network = body['network']
+        if client is cls.client:
+            cls.networks.append(network)
+        else:
+            cls.admin_networks.append(network)
+        return network
+
+    @classmethod
+    def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
+                      ip_version=None, client=None, **kwargs):
+        """Wrapper utility that returns a test subnet."""
+
+        # allow tests to use admin client
+        if not client:
+            client = cls.client
+
+        # The cidr and mask_bits depend on the ip version.
+        ip_version = ip_version if ip_version is not None else cls._ip_version
+        gateway_not_set = gateway == ''
+        if ip_version == 4:
+            cidr = cidr or netaddr.IPNetwork(
+                config.safe_get_config_value(
+                    'network', 'project_network_cidr'))
+            mask_bits = (
+                mask_bits or config.safe_get_config_value(
+                    'network', 'project_network_mask_bits'))
+        elif ip_version == 6:
+            cidr = (
+                cidr or netaddr.IPNetwork(
+                    config.safe_get_config_value(
+                        'network', 'project_network_v6_cidr')))
+            mask_bits = (
+                mask_bits or config.safe_get_config_value(
+                    'network', 'project_network_v6_mask_bits'))
+        # Find a cidr that is not in use yet and create a subnet with it
+        for subnet_cidr in cidr.subnet(mask_bits):
+            if gateway_not_set:
+                gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
+            else:
+                gateway_ip = gateway
+            try:
+                body = client.create_subnet(
+                    network_id=network['id'],
+                    cidr=str(subnet_cidr),
+                    ip_version=ip_version,
+                    gateway_ip=gateway_ip,
+                    **kwargs)
+                break
+            except lib_exc.BadRequest as e:
+                is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+                if not is_overlapping_cidr:
+                    raise
+        else:
+            message = 'Available CIDR for subnet creation could not be found'
+            raise ValueError(message)
+        subnet = body['subnet']
+        if client is cls.client:
+            cls.subnets.append(subnet)
+        else:
+            cls.admin_subnets.append(subnet)
+        return subnet
+
+    @classmethod
+    def create_port(cls, network, **kwargs):
+        """Wrapper utility that returns a test port."""
+        body = cls.client.create_port(network_id=network['id'],
+                                      **kwargs)
+        port = body['port']
+        cls.ports.append(port)
+        return port
+
+    @classmethod
+    def update_port(cls, port, **kwargs):
+        """Wrapper utility that updates a test port."""
+        body = cls.client.update_port(port['id'],
+                                      **kwargs)
+        return body['port']
+
+    @classmethod
+    def _create_router_with_client(
+        cls, client, router_name=None, admin_state_up=False,
+        external_network_id=None, enable_snat=None, **kwargs
+    ):
+        ext_gw_info = {}
+        if external_network_id:
+            ext_gw_info['network_id'] = external_network_id
+        if enable_snat is not None:
+            ext_gw_info['enable_snat'] = enable_snat
+        body = client.create_router(
+            router_name, external_gateway_info=ext_gw_info,
+            admin_state_up=admin_state_up, **kwargs)
+        router = body['router']
+        cls.routers.append(router)
+        return router
+
+    @classmethod
+    def create_router(cls, *args, **kwargs):
+        return cls._create_router_with_client(cls.client, *args, **kwargs)
+
+    @classmethod
+    def create_admin_router(cls, *args, **kwargs):
+        return cls._create_router_with_client(cls.os_admin.network_client,
+                                              *args, **kwargs)
+
+    @classmethod
+    def create_floatingip(cls, external_network_id):
+        """Wrapper utility that returns a test floating IP."""
+        body = cls.client.create_floatingip(
+            floating_network_id=external_network_id)
+        fip = body['floatingip']
+        cls.floating_ips.append(fip)
+        return fip
+
+    @classmethod
+    def create_router_interface(cls, router_id, subnet_id):
+        """Wrapper utility that returns a router interface."""
+        interface = cls.client.add_router_interface_with_subnet_id(
+            router_id, subnet_id)
+        return interface
+
+    @classmethod
+    def get_supported_qos_rule_types(cls):
+        body = cls.client.list_qos_rule_types()
+        return [rule_type['type'] for rule_type in body['rule_types']]
+
+    @classmethod
+    def create_qos_policy(cls, name, description=None, shared=False,
+                          tenant_id=None, is_default=False):
+        """Wrapper utility that returns a test QoS policy."""
+        body = cls.admin_client.create_qos_policy(
+            name, description, shared, tenant_id, is_default)
+        qos_policy = body['policy']
+        cls.qos_policies.append(qos_policy)
+        return qos_policy
+
+    @classmethod
+    def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
+                                        max_burst_kbps,
+                                        direction=constants.EGRESS_DIRECTION):
+        """Wrapper utility that returns a test QoS bandwidth limit rule."""
+        body = cls.admin_client.create_bandwidth_limit_rule(
+            policy_id, max_kbps, max_burst_kbps, direction)
+        qos_rule = body['bandwidth_limit_rule']
+        cls.qos_rules.append(qos_rule)
+        return qos_rule
+
+    @classmethod
+    def delete_router(cls, router):
+        body = cls.client.list_router_interfaces(router['id'])
+        interfaces = body['ports']
+        for i in interfaces:
+            try:
+                cls.client.remove_router_interface_with_subnet_id(
+                    router['id'], i['fixed_ips'][0]['subnet_id'])
+            except lib_exc.NotFound:
+                pass
+        cls.client.delete_router(router['id'])
+
+    @classmethod
+    def create_address_scope(cls, name, is_admin=False, **kwargs):
+        if is_admin:
+            body = cls.admin_client.create_address_scope(name=name, **kwargs)
+            cls.admin_address_scopes.append(body['address_scope'])
+        else:
+            body = cls.client.create_address_scope(name=name, **kwargs)
+            cls.address_scopes.append(body['address_scope'])
+        return body['address_scope']
+
+    @classmethod
+    def create_subnetpool(cls, name, is_admin=False, **kwargs):
+        if is_admin:
+            body = cls.admin_client.create_subnetpool(name, **kwargs)
+            cls.admin_subnetpools.append(body['subnetpool'])
+        else:
+            body = cls.client.create_subnetpool(name, **kwargs)
+            cls.subnetpools.append(body['subnetpool'])
+        return body['subnetpool']
+
+
+class BaseAdminNetworkTest(BaseNetworkTest):
+
+    credentials = ['primary', 'admin']
+
+    @classmethod
+    def setup_clients(cls):
+        super(BaseAdminNetworkTest, cls).setup_clients()
+        cls.admin_client = cls.os_admin.network_client
+        cls.identity_admin_client = cls.os_admin.projects_client
+
+    @classmethod
+    def create_metering_label(cls, name, description):
+        """Wrapper utility that returns a test metering label."""
+        body = cls.admin_client.create_metering_label(
+            description=description,
+            name=data_utils.rand_name("metering-label"))
+        metering_label = body['metering_label']
+        cls.metering_labels.append(metering_label)
+        return metering_label
+
+    @classmethod
+    def create_metering_label_rule(cls, remote_ip_prefix, direction,
+                                   metering_label_id):
+        """Wrapper utility that returns a test metering label rule."""
+        body = cls.admin_client.create_metering_label_rule(
+            remote_ip_prefix=remote_ip_prefix, direction=direction,
+            metering_label_id=metering_label_id)
+        metering_label_rule = body['metering_label_rule']
+        cls.metering_label_rules.append(metering_label_rule)
+        return metering_label_rule
+
+    @classmethod
+    def create_flavor(cls, name, description, service_type):
+        """Wrapper utility that returns a test flavor."""
+        body = cls.admin_client.create_flavor(
+            description=description, service_type=service_type,
+            name=name)
+        flavor = body['flavor']
+        cls.flavors.append(flavor)
+        return flavor
+
+    @classmethod
+    def create_service_profile(cls, description, metainfo, driver):
+        """Wrapper utility that returns a test service profile."""
+        body = cls.admin_client.create_service_profile(
+            driver=driver, metainfo=metainfo, description=description)
+        service_profile = body['service_profile']
+        cls.service_profiles.append(service_profile)
+        return service_profile
+
+    @classmethod
+    def get_unused_ip(cls, net_id, ip_version=None):
+        """Get an unused ip address in a allocation pool of net"""
+        body = cls.admin_client.list_ports(network_id=net_id)
+        ports = body['ports']
+        used_ips = []
+        for port in ports:
+            used_ips.extend(
+                [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
+        body = cls.admin_client.list_subnets(network_id=net_id)
+        subnets = body['subnets']
+
+        for subnet in subnets:
+            if ip_version and subnet['ip_version'] != ip_version:
+                continue
+            cidr = subnet['cidr']
+            allocation_pools = subnet['allocation_pools']
+            iterators = []
+            if allocation_pools:
+                for allocation_pool in allocation_pools:
+                    iterators.append(netaddr.iter_iprange(
+                        allocation_pool['start'], allocation_pool['end']))
+            else:
+                net = netaddr.IPNetwork(cidr)
+
+                def _iterip():
+                    for ip in net:
+                        if ip not in (net.network, net.broadcast):
+                            yield ip
+                iterators.append(iter(_iterip()))
+
+            for iterator in iterators:
+                for ip in iterator:
+                    if str(ip) not in used_ips:
+                        return str(ip)
+
+        message = (
+            "net(%s) has no usable IP address in allocation pools" % net_id)
+        raise exceptions.InvalidConfiguration(message)
+
+
+def require_qos_rule_type(rule_type):
+    def decorator(f):
+        @functools.wraps(f)
+        def wrapper(self, *func_args, **func_kwargs):
+            if rule_type not in self.get_supported_qos_rule_types():
+                raise self.skipException(
+                    "%s rule type is required." % rule_type)
+            return f(self, *func_args, **func_kwargs)
+        return wrapper
+    return decorator
+
+
+def _require_sorting(f):
+    @functools.wraps(f)
+    def inner(self, *args, **kwargs):
+        if not test.is_extension_enabled("sorting", "network"):
+            self.skipTest('Sorting feature is required')
+        return f(self, *args, **kwargs)
+    return inner
+
+
+def _require_pagination(f):
+    @functools.wraps(f)
+    def inner(self, *args, **kwargs):
+        if not test.is_extension_enabled("pagination", "network"):
+            self.skipTest('Pagination feature is required')
+        return f(self, *args, **kwargs)
+    return inner
+
+
+class BaseSearchCriteriaTest(BaseNetworkTest):
+
+    # This should be defined by subclasses to reflect resource name to test
+    resource = None
+
+    field = 'name'
+
+    # NOTE(ihrachys): some names, like those starting with an underscore (_)
+    # are sorted differently depending on whether the plugin implements native
+    # sorting support, or not. So we avoid any such cases here, sticking to
+    # alphanumeric. Also test a case when there are multiple resources with the
+    # same name
+    resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
+
+    force_tenant_isolation = True
+
+    list_kwargs = {}
+
+    list_as_admin = False
+
+    def assertSameOrder(self, original, actual):
+        # gracefully handle iterators passed
+        original = list(original)
+        actual = list(actual)
+        self.assertEqual(len(original), len(actual))
+        for expected, res in zip(original, actual):
+            self.assertEqual(expected[self.field], res[self.field])
+
+    @utils.classproperty
+    def plural_name(self):
+        return '%ss' % self.resource
+
+    @property
+    def list_client(self):
+        return self.admin_client if self.list_as_admin else self.client
+
+    def list_method(self, *args, **kwargs):
+        method = getattr(self.list_client, 'list_%s' % self.plural_name)
+        kwargs.update(self.list_kwargs)
+        return method(*args, **kwargs)
+
+    def get_bare_url(self, url):
+        base_url = self.client.base_url
+        self.assertTrue(url.startswith(base_url))
+        return url[len(base_url):]
+
+    @classmethod
+    def _extract_resources(cls, body):
+        return body[cls.plural_name]
+
+    def _test_list_sorts(self, direction):
+        sort_args = {
+            'sort_dir': direction,
+            'sort_key': self.field
+        }
+        body = self.list_method(**sort_args)
+        resources = self._extract_resources(body)
+        self.assertNotEmpty(
+            resources, "%s list returned is empty" % self.resource)
+        retrieved_names = [res[self.field] for res in resources]
+        expected = sorted(retrieved_names)
+        if direction == constants.SORT_DIRECTION_DESC:
+            expected = list(reversed(expected))
+        self.assertEqual(expected, retrieved_names)
+
+    @_require_sorting
+    def _test_list_sorts_asc(self):
+        self._test_list_sorts(constants.SORT_DIRECTION_ASC)
+
+    @_require_sorting
+    def _test_list_sorts_desc(self):
+        self._test_list_sorts(constants.SORT_DIRECTION_DESC)
+
+    @_require_pagination
+    def _test_list_pagination(self):
+        for limit in range(1, len(self.resource_names) + 1):
+            pagination_args = {
+                'limit': limit,
+            }
+            body = self.list_method(**pagination_args)
+            resources = self._extract_resources(body)
+            self.assertEqual(limit, len(resources))
+
+    @_require_pagination
+    def _test_list_no_pagination_limit_0(self):
+        pagination_args = {
+            'limit': 0,
+        }
+        body = self.list_method(**pagination_args)
+        resources = self._extract_resources(body)
+        self.assertGreaterEqual(len(resources), len(self.resource_names))
+
+    def _test_list_pagination_iteratively(self, lister):
+        # first, collect all resources for later comparison
+        sort_args = {
+            'sort_dir': constants.SORT_DIRECTION_ASC,
+            'sort_key': self.field
+        }
+        body = self.list_method(**sort_args)
+        expected_resources = self._extract_resources(body)
+        self.assertNotEmpty(expected_resources)
+
+        resources = lister(
+            len(expected_resources), sort_args
+        )
+
+        # finally, compare that the list retrieved in one go is identical to
+        # the one containing pagination results
+        self.assertSameOrder(expected_resources, resources)
+
+    def _list_all_with_marker(self, niterations, sort_args):
+        # paginate resources one by one, using last fetched resource as a
+        # marker
+        resources = []
+        for i in range(niterations):
+            pagination_args = sort_args.copy()
+            pagination_args['limit'] = 1
+            if resources:
+                pagination_args['marker'] = resources[-1]['id']
+            body = self.list_method(**pagination_args)
+            resources_ = self._extract_resources(body)
+            self.assertEqual(1, len(resources_))
+            resources.extend(resources_)
+        return resources
+
+    @_require_pagination
+    @_require_sorting
+    def _test_list_pagination_with_marker(self):
+        self._test_list_pagination_iteratively(self._list_all_with_marker)
+
+    def _list_all_with_hrefs(self, niterations, sort_args):
+        # paginate resources one by one, using next href links
+        resources = []
+        prev_links = {}
+
+        for i in range(niterations):
+            if prev_links:
+                uri = self.get_bare_url(prev_links['next'])
+            else:
+                sort_args.update(self.list_kwargs)
+                uri = self.list_client.build_uri(
+                    self.plural_name, limit=1, **sort_args)
+            prev_links, body = self.list_client.get_uri_with_links(
+                self.plural_name, uri
+            )
+            resources_ = self._extract_resources(body)
+            self.assertEqual(1, len(resources_))
+            resources.extend(resources_)
+
+        # The last element is empty and does not contain 'next' link
+        uri = self.get_bare_url(prev_links['next'])
+        prev_links, body = self.client.get_uri_with_links(
+            self.plural_name, uri
+        )
+        self.assertNotIn('next', prev_links)
+
+        # Now walk backwards and compare results
+        resources2 = []
+        for i in range(niterations):
+            uri = self.get_bare_url(prev_links['previous'])
+            prev_links, body = self.list_client.get_uri_with_links(
+                self.plural_name, uri
+            )
+            resources_ = self._extract_resources(body)
+            self.assertEqual(1, len(resources_))
+            resources2.extend(resources_)
+
+        self.assertSameOrder(resources, reversed(resources2))
+
+        return resources
+
+    @_require_pagination
+    @_require_sorting
+    def _test_list_pagination_with_href_links(self):
+        self._test_list_pagination_iteratively(self._list_all_with_hrefs)
+
+    @_require_pagination
+    @_require_sorting
+    def _test_list_pagination_page_reverse_with_href_links(
+            self, direction=constants.SORT_DIRECTION_ASC):
+        pagination_args = {
+            'sort_dir': direction,
+            'sort_key': self.field,
+        }
+        body = self.list_method(**pagination_args)
+        expected_resources = self._extract_resources(body)
+
+        page_size = 2
+        pagination_args['limit'] = page_size
+
+        prev_links = {}
+        resources = []
+        num_resources = len(expected_resources)
+        niterations = int(math.ceil(float(num_resources) / page_size))
+        for i in range(niterations):
+            if prev_links:
+                uri = self.get_bare_url(prev_links['previous'])
+            else:
+                pagination_args.update(self.list_kwargs)
+                uri = self.list_client.build_uri(
+                    self.plural_name, page_reverse=True, **pagination_args)
+            prev_links, body = self.list_client.get_uri_with_links(
+                self.plural_name, uri
+            )
+            resources_ = self._extract_resources(body)
+            self.assertGreaterEqual(page_size, len(resources_))
+            resources.extend(reversed(resources_))
+
+        self.assertSameOrder(expected_resources, reversed(resources))
+
+    @_require_pagination
+    @_require_sorting
+    def _test_list_pagination_page_reverse_asc(self):
+        self._test_list_pagination_page_reverse(
+            direction=constants.SORT_DIRECTION_ASC)
+
+    @_require_pagination
+    @_require_sorting
+    def _test_list_pagination_page_reverse_desc(self):
+        self._test_list_pagination_page_reverse(
+            direction=constants.SORT_DIRECTION_DESC)
+
+    def _test_list_pagination_page_reverse(self, direction):
+        pagination_args = {
+            'sort_dir': direction,
+            'sort_key': self.field,
+            'limit': 3,
+        }
+        body = self.list_method(**pagination_args)
+        expected_resources = self._extract_resources(body)
+
+        pagination_args['limit'] -= 1
+        pagination_args['marker'] = expected_resources[-1]['id']
+        pagination_args['page_reverse'] = True
+        body = self.list_method(**pagination_args)
+
+        self.assertSameOrder(
+            # the last entry is not included in 2nd result when used as a
+            # marker
+            expected_resources[:-1],
+            self._extract_resources(body))
+
+    def _test_list_validation_filters(self):
+        validation_args = {
+            'unknown_filter': 'value',
+        }
+        body = self.list_method(**validation_args)
+        resources = self._extract_resources(body)
+        for resource in resources:
+            self.assertIn(resource['name'], self.resource_names)