blob: 993e356e244f1e89203b73966b9dbe53198186a6 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
64 manager = test.BaseTestCase.get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new)
68 # Neutron uses a different clients manager than the one in the Tempest
69 return clients.Manager(manager.credentials)
70
71 @classmethod
72 def skip_checks(cls):
73 super(BaseNetworkTest, cls).skip_checks()
74 if not CONF.service_available.neutron:
75 raise cls.skipException("Neutron support is required")
76 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
77 raise cls.skipException("IPv6 Tests are disabled.")
78
79 @classmethod
80 def setup_credentials(cls):
81 # Create no network resources for these test.
82 cls.set_network_resources()
83 super(BaseNetworkTest, cls).setup_credentials()
84
85 @classmethod
86 def setup_clients(cls):
87 super(BaseNetworkTest, cls).setup_clients()
88 cls.client = cls.os.network_client
89
90 @classmethod
91 def resource_setup(cls):
92 super(BaseNetworkTest, cls).resource_setup()
93
94 cls.networks = []
95 cls.shared_networks = []
96 cls.subnets = []
97 cls.ports = []
98 cls.routers = []
99 cls.floating_ips = []
100 cls.metering_labels = []
101 cls.service_profiles = []
102 cls.flavors = []
103 cls.metering_label_rules = []
104 cls.qos_rules = []
105 cls.qos_policies = []
106 cls.ethertype = "IPv" + str(cls._ip_version)
107 cls.address_scopes = []
108 cls.admin_address_scopes = []
109 cls.subnetpools = []
110 cls.admin_subnetpools = []
111
112 @classmethod
113 def resource_cleanup(cls):
114 if CONF.service_available.neutron:
115 # Clean up QoS rules
116 for qos_rule in cls.qos_rules:
117 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
118 qos_rule['id'])
119 # Clean up QoS policies
120 for qos_policy in cls.qos_policies:
121 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
122 qos_policy['id'])
123 # Clean up floating IPs
124 for floating_ip in cls.floating_ips:
125 cls._try_delete_resource(cls.client.delete_floatingip,
126 floating_ip['id'])
127 # Clean up routers
128 for router in cls.routers:
129 cls._try_delete_resource(cls.delete_router,
130 router)
131 # Clean up metering label rules
132 for metering_label_rule in cls.metering_label_rules:
133 cls._try_delete_resource(
134 cls.admin_client.delete_metering_label_rule,
135 metering_label_rule['id'])
136 # Clean up metering labels
137 for metering_label in cls.metering_labels:
138 cls._try_delete_resource(
139 cls.admin_client.delete_metering_label,
140 metering_label['id'])
141 # Clean up flavors
142 for flavor in cls.flavors:
143 cls._try_delete_resource(
144 cls.admin_client.delete_flavor,
145 flavor['id'])
146 # Clean up service profiles
147 for service_profile in cls.service_profiles:
148 cls._try_delete_resource(
149 cls.admin_client.delete_service_profile,
150 service_profile['id'])
151 # Clean up ports
152 for port in cls.ports:
153 cls._try_delete_resource(cls.client.delete_port,
154 port['id'])
155 # Clean up subnets
156 for subnet in cls.subnets:
157 cls._try_delete_resource(cls.client.delete_subnet,
158 subnet['id'])
159 # Clean up networks
160 for network in cls.networks:
161 cls._try_delete_resource(cls.client.delete_network,
162 network['id'])
163
164 # Clean up shared networks
165 for network in cls.shared_networks:
166 cls._try_delete_resource(cls.admin_client.delete_network,
167 network['id'])
168
169 for subnetpool in cls.subnetpools:
170 cls._try_delete_resource(cls.client.delete_subnetpool,
171 subnetpool['id'])
172
173 for subnetpool in cls.admin_subnetpools:
174 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
175 subnetpool['id'])
176
177 for address_scope in cls.address_scopes:
178 cls._try_delete_resource(cls.client.delete_address_scope,
179 address_scope['id'])
180
181 for address_scope in cls.admin_address_scopes:
182 cls._try_delete_resource(
183 cls.admin_client.delete_address_scope,
184 address_scope['id'])
185
186 super(BaseNetworkTest, cls).resource_cleanup()
187
188 @classmethod
189 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
190 """Cleanup resources in case of test-failure
191
192 Some resources are explicitly deleted by the test.
193 If the test failed to delete a resource, this method will execute
194 the appropriate delete methods. Otherwise, the method ignores NotFound
195 exceptions thrown for resources that were correctly deleted by the
196 test.
197
198 :param delete_callable: delete method
199 :param args: arguments for delete method
200 :param kwargs: keyword arguments for delete method
201 """
202 try:
203 delete_callable(*args, **kwargs)
204 # if resource is not found, this means it was deleted in the test
205 except lib_exc.NotFound:
206 pass
207
208 @classmethod
209 def create_network(cls, network_name=None, **kwargs):
210 """Wrapper utility that returns a test network."""
211 network_name = network_name or data_utils.rand_name('test-network-')
212
213 body = cls.client.create_network(name=network_name, **kwargs)
214 network = body['network']
215 cls.networks.append(network)
216 return network
217
218 @classmethod
219 def create_shared_network(cls, network_name=None, **post_body):
220 network_name = network_name or data_utils.rand_name('sharednetwork-')
221 post_body.update({'name': network_name, 'shared': True})
222 body = cls.admin_client.create_network(**post_body)
223 network = body['network']
224 cls.shared_networks.append(network)
225 return network
226
227 @classmethod
228 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
229 ip_version=None, client=None, **kwargs):
230 """Wrapper utility that returns a test subnet."""
231
232 # allow tests to use admin client
233 if not client:
234 client = cls.client
235
236 # The cidr and mask_bits depend on the ip version.
237 ip_version = ip_version if ip_version is not None else cls._ip_version
238 gateway_not_set = gateway == ''
239 if ip_version == 4:
240 cidr = cidr or netaddr.IPNetwork(
241 config.safe_get_config_value(
242 'network', 'project_network_cidr'))
243 mask_bits = (
244 mask_bits or config.safe_get_config_value(
245 'network', 'project_network_mask_bits'))
246 elif ip_version == 6:
247 cidr = (
248 cidr or netaddr.IPNetwork(
249 config.safe_get_config_value(
250 'network', 'project_network_v6_cidr')))
251 mask_bits = (
252 mask_bits or config.safe_get_config_value(
253 'network', 'project_network_v6_mask_bits'))
254 # Find a cidr that is not in use yet and create a subnet with it
255 for subnet_cidr in cidr.subnet(mask_bits):
256 if gateway_not_set:
257 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
258 else:
259 gateway_ip = gateway
260 try:
261 body = client.create_subnet(
262 network_id=network['id'],
263 cidr=str(subnet_cidr),
264 ip_version=ip_version,
265 gateway_ip=gateway_ip,
266 **kwargs)
267 break
268 except lib_exc.BadRequest as e:
269 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
270 if not is_overlapping_cidr:
271 raise
272 else:
273 message = 'Available CIDR for subnet creation could not be found'
274 raise ValueError(message)
275 subnet = body['subnet']
276 cls.subnets.append(subnet)
277 return subnet
278
279 @classmethod
280 def create_port(cls, network, **kwargs):
281 """Wrapper utility that returns a test port."""
282 body = cls.client.create_port(network_id=network['id'],
283 **kwargs)
284 port = body['port']
285 cls.ports.append(port)
286 return port
287
288 @classmethod
289 def update_port(cls, port, **kwargs):
290 """Wrapper utility that updates a test port."""
291 body = cls.client.update_port(port['id'],
292 **kwargs)
293 return body['port']
294
295 @classmethod
296 def create_router(cls, router_name=None, admin_state_up=False,
297 external_network_id=None, enable_snat=None,
298 **kwargs):
299 ext_gw_info = {}
300 if external_network_id:
301 ext_gw_info['network_id'] = external_network_id
302 if enable_snat:
303 ext_gw_info['enable_snat'] = enable_snat
304 body = cls.client.create_router(
305 router_name, external_gateway_info=ext_gw_info,
306 admin_state_up=admin_state_up, **kwargs)
307 router = body['router']
308 cls.routers.append(router)
309 return router
310
311 @classmethod
312 def create_floatingip(cls, external_network_id):
313 """Wrapper utility that returns a test floating IP."""
314 body = cls.client.create_floatingip(
315 floating_network_id=external_network_id)
316 fip = body['floatingip']
317 cls.floating_ips.append(fip)
318 return fip
319
320 @classmethod
321 def create_router_interface(cls, router_id, subnet_id):
322 """Wrapper utility that returns a router interface."""
323 interface = cls.client.add_router_interface_with_subnet_id(
324 router_id, subnet_id)
325 return interface
326
327 @classmethod
328 def create_qos_policy(cls, name, description, shared, tenant_id=None):
329 """Wrapper utility that returns a test QoS policy."""
330 body = cls.admin_client.create_qos_policy(
331 name, description, shared, tenant_id)
332 qos_policy = body['policy']
333 cls.qos_policies.append(qos_policy)
334 return qos_policy
335
336 @classmethod
337 def create_qos_bandwidth_limit_rule(cls, policy_id,
338 max_kbps, max_burst_kbps):
339 """Wrapper utility that returns a test QoS bandwidth limit rule."""
340 body = cls.admin_client.create_bandwidth_limit_rule(
341 policy_id, max_kbps, max_burst_kbps)
342 qos_rule = body['bandwidth_limit_rule']
343 cls.qos_rules.append(qos_rule)
344 return qos_rule
345
346 @classmethod
347 def delete_router(cls, router):
348 body = cls.client.list_router_interfaces(router['id'])
349 interfaces = body['ports']
350 for i in interfaces:
351 try:
352 cls.client.remove_router_interface_with_subnet_id(
353 router['id'], i['fixed_ips'][0]['subnet_id'])
354 except lib_exc.NotFound:
355 pass
356 cls.client.delete_router(router['id'])
357
358 @classmethod
359 def create_address_scope(cls, name, is_admin=False, **kwargs):
360 if is_admin:
361 body = cls.admin_client.create_address_scope(name=name, **kwargs)
362 cls.admin_address_scopes.append(body['address_scope'])
363 else:
364 body = cls.client.create_address_scope(name=name, **kwargs)
365 cls.address_scopes.append(body['address_scope'])
366 return body['address_scope']
367
368 @classmethod
369 def create_subnetpool(cls, name, is_admin=False, **kwargs):
370 if is_admin:
371 body = cls.admin_client.create_subnetpool(name, **kwargs)
372 cls.admin_subnetpools.append(body['subnetpool'])
373 else:
374 body = cls.client.create_subnetpool(name, **kwargs)
375 cls.subnetpools.append(body['subnetpool'])
376 return body['subnetpool']
377
378
379class BaseAdminNetworkTest(BaseNetworkTest):
380
381 credentials = ['primary', 'admin']
382
383 @classmethod
384 def setup_clients(cls):
385 super(BaseAdminNetworkTest, cls).setup_clients()
386 cls.admin_client = cls.os_adm.network_client
387 cls.identity_admin_client = cls.os_adm.tenants_client
388
389 @classmethod
390 def create_metering_label(cls, name, description):
391 """Wrapper utility that returns a test metering label."""
392 body = cls.admin_client.create_metering_label(
393 description=description,
394 name=data_utils.rand_name("metering-label"))
395 metering_label = body['metering_label']
396 cls.metering_labels.append(metering_label)
397 return metering_label
398
399 @classmethod
400 def create_metering_label_rule(cls, remote_ip_prefix, direction,
401 metering_label_id):
402 """Wrapper utility that returns a test metering label rule."""
403 body = cls.admin_client.create_metering_label_rule(
404 remote_ip_prefix=remote_ip_prefix, direction=direction,
405 metering_label_id=metering_label_id)
406 metering_label_rule = body['metering_label_rule']
407 cls.metering_label_rules.append(metering_label_rule)
408 return metering_label_rule
409
410 @classmethod
411 def create_flavor(cls, name, description, service_type):
412 """Wrapper utility that returns a test flavor."""
413 body = cls.admin_client.create_flavor(
414 description=description, service_type=service_type,
415 name=name)
416 flavor = body['flavor']
417 cls.flavors.append(flavor)
418 return flavor
419
420 @classmethod
421 def create_service_profile(cls, description, metainfo, driver):
422 """Wrapper utility that returns a test service profile."""
423 body = cls.admin_client.create_service_profile(
424 driver=driver, metainfo=metainfo, description=description)
425 service_profile = body['service_profile']
426 cls.service_profiles.append(service_profile)
427 return service_profile
428
429 @classmethod
430 def get_unused_ip(cls, net_id, ip_version=None):
431 """Get an unused ip address in a allocaion pool of net"""
432 body = cls.admin_client.list_ports(network_id=net_id)
433 ports = body['ports']
434 used_ips = []
435 for port in ports:
436 used_ips.extend(
437 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
438 body = cls.admin_client.list_subnets(network_id=net_id)
439 subnets = body['subnets']
440
441 for subnet in subnets:
442 if ip_version and subnet['ip_version'] != ip_version:
443 continue
444 cidr = subnet['cidr']
445 allocation_pools = subnet['allocation_pools']
446 iterators = []
447 if allocation_pools:
448 for allocation_pool in allocation_pools:
449 iterators.append(netaddr.iter_iprange(
450 allocation_pool['start'], allocation_pool['end']))
451 else:
452 net = netaddr.IPNetwork(cidr)
453
454 def _iterip():
455 for ip in net:
456 if ip not in (net.network, net.broadcast):
457 yield ip
458 iterators.append(iter(_iterip()))
459
460 for iterator in iterators:
461 for ip in iterator:
462 if str(ip) not in used_ips:
463 return str(ip)
464
465 message = (
466 "net(%s) has no usable IP address in allocation pools" % net_id)
467 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200468
469
470def _require_sorting(f):
471 @functools.wraps(f)
472 def inner(self, *args, **kwargs):
473 if not CONF.neutron_plugin_options.validate_sorting:
474 self.skipTest('Sorting feature is required')
475 return f(self, *args, **kwargs)
476 return inner
477
478
479def _require_pagination(f):
480 @functools.wraps(f)
481 def inner(self, *args, **kwargs):
482 if not CONF.neutron_plugin_options.validate_pagination:
483 self.skipTest('Pagination feature is required')
484 return f(self, *args, **kwargs)
485 return inner
486
487
488class BaseSearchCriteriaTest(BaseNetworkTest):
489
490 # This should be defined by subclasses to reflect resource name to test
491 resource = None
492
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200493 # NOTE(ihrachys): some names, like those starting with an underscore (_)
494 # are sorted differently depending on whether the plugin implements native
495 # sorting support, or not. So we avoid any such cases here, sticking to
496 # alphanumeric. Also test a case when there are multiple resources with the
497 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200498 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
499
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200500 force_tenant_isolation = True
501
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200502 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200503
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200504 def assertSameOrder(self, original, actual):
505 # gracefully handle iterators passed
506 original = list(original)
507 actual = list(actual)
508 self.assertEqual(len(original), len(actual))
509 for expected, res in zip(original, actual):
510 self.assertEqual(expected['name'], res['name'])
511
512 @utils.classproperty
513 def plural_name(self):
514 return '%ss' % self.resource
515
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200516 def list_method(self, *args, **kwargs):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200517 method = getattr(self.client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200518 kwargs.update(self.list_kwargs)
519 return method(*args, **kwargs)
520
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200521 def get_bare_url(self, url):
522 base_url = self.client.base_url
523 self.assertTrue(url.startswith(base_url))
524 return url[len(base_url):]
525
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200526 @classmethod
527 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200528 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200529
530 def _test_list_sorts(self, direction):
531 sort_args = {
532 'sort_dir': direction,
533 'sort_key': 'name'
534 }
535 body = self.list_method(**sort_args)
536 resources = self._extract_resources(body)
537 self.assertNotEmpty(
538 resources, "%s list returned is empty" % self.resource)
539 retrieved_names = [res['name'] for res in resources]
540 expected = sorted(retrieved_names)
541 if direction == constants.SORT_DIRECTION_DESC:
542 expected = list(reversed(expected))
543 self.assertEqual(expected, retrieved_names)
544
545 @_require_sorting
546 def _test_list_sorts_asc(self):
547 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
548
549 @_require_sorting
550 def _test_list_sorts_desc(self):
551 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
552
553 @_require_pagination
554 def _test_list_pagination(self):
555 for limit in range(1, len(self.resource_names) + 1):
556 pagination_args = {
557 'limit': limit,
558 }
559 body = self.list_method(**pagination_args)
560 resources = self._extract_resources(body)
561 self.assertEqual(limit, len(resources))
562
563 @_require_pagination
564 def _test_list_no_pagination_limit_0(self):
565 pagination_args = {
566 'limit': 0,
567 }
568 body = self.list_method(**pagination_args)
569 resources = self._extract_resources(body)
570 self.assertTrue(len(resources) >= len(self.resource_names))
571
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200572 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200573 # first, collect all resources for later comparison
574 sort_args = {
575 'sort_dir': constants.SORT_DIRECTION_ASC,
576 'sort_key': 'name'
577 }
578 body = self.list_method(**sort_args)
579 expected_resources = self._extract_resources(body)
580 self.assertNotEmpty(expected_resources)
581
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200582 resources = lister(
583 len(expected_resources), sort_args
584 )
585
586 # finally, compare that the list retrieved in one go is identical to
587 # the one containing pagination results
588 self.assertSameOrder(expected_resources, resources)
589
590 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200591 # paginate resources one by one, using last fetched resource as a
592 # marker
593 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200594 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200595 pagination_args = sort_args.copy()
596 pagination_args['limit'] = 1
597 if resources:
598 pagination_args['marker'] = resources[-1]['id']
599 body = self.list_method(**pagination_args)
600 resources_ = self._extract_resources(body)
601 self.assertEqual(1, len(resources_))
602 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200603 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200604
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200605 @_require_pagination
606 @_require_sorting
607 def _test_list_pagination_with_marker(self):
608 self._test_list_pagination_iteratively(self._list_all_with_marker)
609
610 def _list_all_with_hrefs(self, niterations, sort_args):
611 # paginate resources one by one, using next href links
612 resources = []
613 prev_links = {}
614
615 for i in range(niterations):
616 if prev_links:
617 uri = self.get_bare_url(prev_links['next'])
618 else:
619 uri = self.client.build_uri(
620 self.plural_name, limit=1, **sort_args)
621 prev_links, body = self.client.get_uri_with_links(
622 self.plural_name, uri
623 )
624 resources_ = self._extract_resources(body)
625 self.assertEqual(1, len(resources_))
626 resources.extend(resources_)
627
628 # The last element is empty and does not contain 'next' link
629 uri = self.get_bare_url(prev_links['next'])
630 prev_links, body = self.client.get_uri_with_links(
631 self.plural_name, uri
632 )
633 self.assertNotIn('next', prev_links)
634
635 # Now walk backwards and compare results
636 resources2 = []
637 for i in range(niterations):
638 uri = self.get_bare_url(prev_links['previous'])
639 prev_links, body = self.client.get_uri_with_links(
640 self.plural_name, uri
641 )
642 resources_ = self._extract_resources(body)
643 self.assertEqual(1, len(resources_))
644 resources2.extend(resources_)
645
646 self.assertSameOrder(resources, reversed(resources2))
647
648 return resources
649
650 @_require_pagination
651 @_require_sorting
652 def _test_list_pagination_with_href_links(self):
653 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
654
655 @_require_pagination
656 @_require_sorting
657 def _test_list_pagination_page_reverse_with_href_links(
658 self, direction=constants.SORT_DIRECTION_ASC):
659 pagination_args = {
660 'sort_dir': direction,
661 'sort_key': 'name',
662 }
663 body = self.list_method(**pagination_args)
664 expected_resources = self._extract_resources(body)
665
666 page_size = 2
667 pagination_args['limit'] = page_size
668
669 prev_links = {}
670 resources = []
671 num_resources = len(expected_resources)
672 niterations = int(math.ceil(float(num_resources) / page_size))
673 for i in range(niterations):
674 if prev_links:
675 uri = self.get_bare_url(prev_links['previous'])
676 else:
677 uri = self.client.build_uri(
678 self.plural_name, page_reverse=True, **pagination_args)
679 prev_links, body = self.client.get_uri_with_links(
680 self.plural_name, uri
681 )
682 resources_ = self._extract_resources(body)
683 self.assertTrue(page_size >= len(resources_))
684 resources.extend(reversed(resources_))
685
686 self.assertSameOrder(expected_resources, reversed(resources))
687
688 @_require_pagination
689 @_require_sorting
690 def _test_list_pagination_page_reverse_asc(self):
691 self._test_list_pagination_page_reverse(
692 direction=constants.SORT_DIRECTION_ASC)
693
694 @_require_pagination
695 @_require_sorting
696 def _test_list_pagination_page_reverse_desc(self):
697 self._test_list_pagination_page_reverse(
698 direction=constants.SORT_DIRECTION_DESC)
699
700 def _test_list_pagination_page_reverse(self, direction):
701 pagination_args = {
702 'sort_dir': direction,
703 'sort_key': 'name',
704 'limit': 3,
705 }
706 body = self.list_method(**pagination_args)
707 expected_resources = self._extract_resources(body)
708
709 pagination_args['limit'] -= 1
710 pagination_args['marker'] = expected_resources[-1]['id']
711 pagination_args['page_reverse'] = True
712 body = self.list_method(**pagination_args)
713
714 self.assertSameOrder(
715 # the last entry is not included in 2nd result when used as a
716 # marker
717 expected_resources[:-1],
718 self._extract_resources(body))