blob: 45e12f06dc276c699b2dd453209804fcebcf0360 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
64 manager = test.BaseTestCase.get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new)
68 # Neutron uses a different clients manager than the one in the Tempest
69 return clients.Manager(manager.credentials)
70
71 @classmethod
72 def skip_checks(cls):
73 super(BaseNetworkTest, cls).skip_checks()
74 if not CONF.service_available.neutron:
75 raise cls.skipException("Neutron support is required")
76 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
77 raise cls.skipException("IPv6 Tests are disabled.")
78
79 @classmethod
80 def setup_credentials(cls):
81 # Create no network resources for these test.
82 cls.set_network_resources()
83 super(BaseNetworkTest, cls).setup_credentials()
84
85 @classmethod
86 def setup_clients(cls):
87 super(BaseNetworkTest, cls).setup_clients()
88 cls.client = cls.os.network_client
89
90 @classmethod
91 def resource_setup(cls):
92 super(BaseNetworkTest, cls).resource_setup()
93
94 cls.networks = []
95 cls.shared_networks = []
96 cls.subnets = []
97 cls.ports = []
98 cls.routers = []
99 cls.floating_ips = []
100 cls.metering_labels = []
101 cls.service_profiles = []
102 cls.flavors = []
103 cls.metering_label_rules = []
104 cls.qos_rules = []
105 cls.qos_policies = []
106 cls.ethertype = "IPv" + str(cls._ip_version)
107 cls.address_scopes = []
108 cls.admin_address_scopes = []
109 cls.subnetpools = []
110 cls.admin_subnetpools = []
111
112 @classmethod
113 def resource_cleanup(cls):
114 if CONF.service_available.neutron:
115 # Clean up QoS rules
116 for qos_rule in cls.qos_rules:
117 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
118 qos_rule['id'])
119 # Clean up QoS policies
120 for qos_policy in cls.qos_policies:
121 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
122 qos_policy['id'])
123 # Clean up floating IPs
124 for floating_ip in cls.floating_ips:
125 cls._try_delete_resource(cls.client.delete_floatingip,
126 floating_ip['id'])
127 # Clean up routers
128 for router in cls.routers:
129 cls._try_delete_resource(cls.delete_router,
130 router)
131 # Clean up metering label rules
132 for metering_label_rule in cls.metering_label_rules:
133 cls._try_delete_resource(
134 cls.admin_client.delete_metering_label_rule,
135 metering_label_rule['id'])
136 # Clean up metering labels
137 for metering_label in cls.metering_labels:
138 cls._try_delete_resource(
139 cls.admin_client.delete_metering_label,
140 metering_label['id'])
141 # Clean up flavors
142 for flavor in cls.flavors:
143 cls._try_delete_resource(
144 cls.admin_client.delete_flavor,
145 flavor['id'])
146 # Clean up service profiles
147 for service_profile in cls.service_profiles:
148 cls._try_delete_resource(
149 cls.admin_client.delete_service_profile,
150 service_profile['id'])
151 # Clean up ports
152 for port in cls.ports:
153 cls._try_delete_resource(cls.client.delete_port,
154 port['id'])
155 # Clean up subnets
156 for subnet in cls.subnets:
157 cls._try_delete_resource(cls.client.delete_subnet,
158 subnet['id'])
159 # Clean up networks
160 for network in cls.networks:
161 cls._try_delete_resource(cls.client.delete_network,
162 network['id'])
163
164 # Clean up shared networks
165 for network in cls.shared_networks:
166 cls._try_delete_resource(cls.admin_client.delete_network,
167 network['id'])
168
169 for subnetpool in cls.subnetpools:
170 cls._try_delete_resource(cls.client.delete_subnetpool,
171 subnetpool['id'])
172
173 for subnetpool in cls.admin_subnetpools:
174 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
175 subnetpool['id'])
176
177 for address_scope in cls.address_scopes:
178 cls._try_delete_resource(cls.client.delete_address_scope,
179 address_scope['id'])
180
181 for address_scope in cls.admin_address_scopes:
182 cls._try_delete_resource(
183 cls.admin_client.delete_address_scope,
184 address_scope['id'])
185
186 super(BaseNetworkTest, cls).resource_cleanup()
187
188 @classmethod
189 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
190 """Cleanup resources in case of test-failure
191
192 Some resources are explicitly deleted by the test.
193 If the test failed to delete a resource, this method will execute
194 the appropriate delete methods. Otherwise, the method ignores NotFound
195 exceptions thrown for resources that were correctly deleted by the
196 test.
197
198 :param delete_callable: delete method
199 :param args: arguments for delete method
200 :param kwargs: keyword arguments for delete method
201 """
202 try:
203 delete_callable(*args, **kwargs)
204 # if resource is not found, this means it was deleted in the test
205 except lib_exc.NotFound:
206 pass
207
208 @classmethod
209 def create_network(cls, network_name=None, **kwargs):
210 """Wrapper utility that returns a test network."""
211 network_name = network_name or data_utils.rand_name('test-network-')
212
213 body = cls.client.create_network(name=network_name, **kwargs)
214 network = body['network']
215 cls.networks.append(network)
216 return network
217
218 @classmethod
219 def create_shared_network(cls, network_name=None, **post_body):
220 network_name = network_name or data_utils.rand_name('sharednetwork-')
221 post_body.update({'name': network_name, 'shared': True})
222 body = cls.admin_client.create_network(**post_body)
223 network = body['network']
224 cls.shared_networks.append(network)
225 return network
226
227 @classmethod
228 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
229 ip_version=None, client=None, **kwargs):
230 """Wrapper utility that returns a test subnet."""
231
232 # allow tests to use admin client
233 if not client:
234 client = cls.client
235
236 # The cidr and mask_bits depend on the ip version.
237 ip_version = ip_version if ip_version is not None else cls._ip_version
238 gateway_not_set = gateway == ''
239 if ip_version == 4:
240 cidr = cidr or netaddr.IPNetwork(
241 config.safe_get_config_value(
242 'network', 'project_network_cidr'))
243 mask_bits = (
244 mask_bits or config.safe_get_config_value(
245 'network', 'project_network_mask_bits'))
246 elif ip_version == 6:
247 cidr = (
248 cidr or netaddr.IPNetwork(
249 config.safe_get_config_value(
250 'network', 'project_network_v6_cidr')))
251 mask_bits = (
252 mask_bits or config.safe_get_config_value(
253 'network', 'project_network_v6_mask_bits'))
254 # Find a cidr that is not in use yet and create a subnet with it
255 for subnet_cidr in cidr.subnet(mask_bits):
256 if gateway_not_set:
257 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
258 else:
259 gateway_ip = gateway
260 try:
261 body = client.create_subnet(
262 network_id=network['id'],
263 cidr=str(subnet_cidr),
264 ip_version=ip_version,
265 gateway_ip=gateway_ip,
266 **kwargs)
267 break
268 except lib_exc.BadRequest as e:
269 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
270 if not is_overlapping_cidr:
271 raise
272 else:
273 message = 'Available CIDR for subnet creation could not be found'
274 raise ValueError(message)
275 subnet = body['subnet']
276 cls.subnets.append(subnet)
277 return subnet
278
279 @classmethod
280 def create_port(cls, network, **kwargs):
281 """Wrapper utility that returns a test port."""
282 body = cls.client.create_port(network_id=network['id'],
283 **kwargs)
284 port = body['port']
285 cls.ports.append(port)
286 return port
287
288 @classmethod
289 def update_port(cls, port, **kwargs):
290 """Wrapper utility that updates a test port."""
291 body = cls.client.update_port(port['id'],
292 **kwargs)
293 return body['port']
294
295 @classmethod
296 def create_router(cls, router_name=None, admin_state_up=False,
297 external_network_id=None, enable_snat=None,
298 **kwargs):
299 ext_gw_info = {}
300 if external_network_id:
301 ext_gw_info['network_id'] = external_network_id
302 if enable_snat:
303 ext_gw_info['enable_snat'] = enable_snat
304 body = cls.client.create_router(
305 router_name, external_gateway_info=ext_gw_info,
306 admin_state_up=admin_state_up, **kwargs)
307 router = body['router']
308 cls.routers.append(router)
309 return router
310
311 @classmethod
312 def create_floatingip(cls, external_network_id):
313 """Wrapper utility that returns a test floating IP."""
314 body = cls.client.create_floatingip(
315 floating_network_id=external_network_id)
316 fip = body['floatingip']
317 cls.floating_ips.append(fip)
318 return fip
319
320 @classmethod
321 def create_router_interface(cls, router_id, subnet_id):
322 """Wrapper utility that returns a router interface."""
323 interface = cls.client.add_router_interface_with_subnet_id(
324 router_id, subnet_id)
325 return interface
326
327 @classmethod
328 def create_qos_policy(cls, name, description, shared, tenant_id=None):
329 """Wrapper utility that returns a test QoS policy."""
330 body = cls.admin_client.create_qos_policy(
331 name, description, shared, tenant_id)
332 qos_policy = body['policy']
333 cls.qos_policies.append(qos_policy)
334 return qos_policy
335
336 @classmethod
337 def create_qos_bandwidth_limit_rule(cls, policy_id,
338 max_kbps, max_burst_kbps):
339 """Wrapper utility that returns a test QoS bandwidth limit rule."""
340 body = cls.admin_client.create_bandwidth_limit_rule(
341 policy_id, max_kbps, max_burst_kbps)
342 qos_rule = body['bandwidth_limit_rule']
343 cls.qos_rules.append(qos_rule)
344 return qos_rule
345
346 @classmethod
347 def delete_router(cls, router):
348 body = cls.client.list_router_interfaces(router['id'])
349 interfaces = body['ports']
350 for i in interfaces:
351 try:
352 cls.client.remove_router_interface_with_subnet_id(
353 router['id'], i['fixed_ips'][0]['subnet_id'])
354 except lib_exc.NotFound:
355 pass
356 cls.client.delete_router(router['id'])
357
358 @classmethod
359 def create_address_scope(cls, name, is_admin=False, **kwargs):
360 if is_admin:
361 body = cls.admin_client.create_address_scope(name=name, **kwargs)
362 cls.admin_address_scopes.append(body['address_scope'])
363 else:
364 body = cls.client.create_address_scope(name=name, **kwargs)
365 cls.address_scopes.append(body['address_scope'])
366 return body['address_scope']
367
368 @classmethod
369 def create_subnetpool(cls, name, is_admin=False, **kwargs):
370 if is_admin:
371 body = cls.admin_client.create_subnetpool(name, **kwargs)
372 cls.admin_subnetpools.append(body['subnetpool'])
373 else:
374 body = cls.client.create_subnetpool(name, **kwargs)
375 cls.subnetpools.append(body['subnetpool'])
376 return body['subnetpool']
377
378
379class BaseAdminNetworkTest(BaseNetworkTest):
380
381 credentials = ['primary', 'admin']
382
383 @classmethod
384 def setup_clients(cls):
385 super(BaseAdminNetworkTest, cls).setup_clients()
386 cls.admin_client = cls.os_adm.network_client
387 cls.identity_admin_client = cls.os_adm.tenants_client
388
389 @classmethod
390 def create_metering_label(cls, name, description):
391 """Wrapper utility that returns a test metering label."""
392 body = cls.admin_client.create_metering_label(
393 description=description,
394 name=data_utils.rand_name("metering-label"))
395 metering_label = body['metering_label']
396 cls.metering_labels.append(metering_label)
397 return metering_label
398
399 @classmethod
400 def create_metering_label_rule(cls, remote_ip_prefix, direction,
401 metering_label_id):
402 """Wrapper utility that returns a test metering label rule."""
403 body = cls.admin_client.create_metering_label_rule(
404 remote_ip_prefix=remote_ip_prefix, direction=direction,
405 metering_label_id=metering_label_id)
406 metering_label_rule = body['metering_label_rule']
407 cls.metering_label_rules.append(metering_label_rule)
408 return metering_label_rule
409
410 @classmethod
411 def create_flavor(cls, name, description, service_type):
412 """Wrapper utility that returns a test flavor."""
413 body = cls.admin_client.create_flavor(
414 description=description, service_type=service_type,
415 name=name)
416 flavor = body['flavor']
417 cls.flavors.append(flavor)
418 return flavor
419
420 @classmethod
421 def create_service_profile(cls, description, metainfo, driver):
422 """Wrapper utility that returns a test service profile."""
423 body = cls.admin_client.create_service_profile(
424 driver=driver, metainfo=metainfo, description=description)
425 service_profile = body['service_profile']
426 cls.service_profiles.append(service_profile)
427 return service_profile
428
429 @classmethod
430 def get_unused_ip(cls, net_id, ip_version=None):
431 """Get an unused ip address in a allocaion pool of net"""
432 body = cls.admin_client.list_ports(network_id=net_id)
433 ports = body['ports']
434 used_ips = []
435 for port in ports:
436 used_ips.extend(
437 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
438 body = cls.admin_client.list_subnets(network_id=net_id)
439 subnets = body['subnets']
440
441 for subnet in subnets:
442 if ip_version and subnet['ip_version'] != ip_version:
443 continue
444 cidr = subnet['cidr']
445 allocation_pools = subnet['allocation_pools']
446 iterators = []
447 if allocation_pools:
448 for allocation_pool in allocation_pools:
449 iterators.append(netaddr.iter_iprange(
450 allocation_pool['start'], allocation_pool['end']))
451 else:
452 net = netaddr.IPNetwork(cidr)
453
454 def _iterip():
455 for ip in net:
456 if ip not in (net.network, net.broadcast):
457 yield ip
458 iterators.append(iter(_iterip()))
459
460 for iterator in iterators:
461 for ip in iterator:
462 if str(ip) not in used_ips:
463 return str(ip)
464
465 message = (
466 "net(%s) has no usable IP address in allocation pools" % net_id)
467 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200468
469
470def _require_sorting(f):
471 @functools.wraps(f)
472 def inner(self, *args, **kwargs):
473 if not CONF.neutron_plugin_options.validate_sorting:
474 self.skipTest('Sorting feature is required')
475 return f(self, *args, **kwargs)
476 return inner
477
478
479def _require_pagination(f):
480 @functools.wraps(f)
481 def inner(self, *args, **kwargs):
482 if not CONF.neutron_plugin_options.validate_pagination:
483 self.skipTest('Pagination feature is required')
484 return f(self, *args, **kwargs)
485 return inner
486
487
488class BaseSearchCriteriaTest(BaseNetworkTest):
489
490 # This should be defined by subclasses to reflect resource name to test
491 resource = None
492
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200493 # NOTE(ihrachys): some names, like those starting with an underscore (_)
494 # are sorted differently depending on whether the plugin implements native
495 # sorting support, or not. So we avoid any such cases here, sticking to
496 # alphanumeric. Also test a case when there are multiple resources with the
497 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200498 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
499
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200500 force_tenant_isolation = True
501
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200502 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200503
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200504 def assertSameOrder(self, original, actual):
505 # gracefully handle iterators passed
506 original = list(original)
507 actual = list(actual)
508 self.assertEqual(len(original), len(actual))
509 for expected, res in zip(original, actual):
510 self.assertEqual(expected['name'], res['name'])
511
512 @utils.classproperty
513 def plural_name(self):
514 return '%ss' % self.resource
515
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200516 def list_method(self, *args, **kwargs):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200517 method = getattr(self.client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200518 kwargs.update(self.list_kwargs)
519 return method(*args, **kwargs)
520
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200521 def get_bare_url(self, url):
522 base_url = self.client.base_url
523 self.assertTrue(url.startswith(base_url))
524 return url[len(base_url):]
525
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200526 @classmethod
527 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200528 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200529
530 def _test_list_sorts(self, direction):
531 sort_args = {
532 'sort_dir': direction,
533 'sort_key': 'name'
534 }
535 body = self.list_method(**sort_args)
536 resources = self._extract_resources(body)
537 self.assertNotEmpty(
538 resources, "%s list returned is empty" % self.resource)
539 retrieved_names = [res['name'] for res in resources]
540 expected = sorted(retrieved_names)
541 if direction == constants.SORT_DIRECTION_DESC:
542 expected = list(reversed(expected))
543 self.assertEqual(expected, retrieved_names)
544
545 @_require_sorting
546 def _test_list_sorts_asc(self):
547 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
548
549 @_require_sorting
550 def _test_list_sorts_desc(self):
551 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
552
553 @_require_pagination
554 def _test_list_pagination(self):
555 for limit in range(1, len(self.resource_names) + 1):
556 pagination_args = {
557 'limit': limit,
558 }
559 body = self.list_method(**pagination_args)
560 resources = self._extract_resources(body)
561 self.assertEqual(limit, len(resources))
562
563 @_require_pagination
564 def _test_list_no_pagination_limit_0(self):
565 pagination_args = {
566 'limit': 0,
567 }
568 body = self.list_method(**pagination_args)
569 resources = self._extract_resources(body)
570 self.assertTrue(len(resources) >= len(self.resource_names))
571
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200572 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200573 # first, collect all resources for later comparison
574 sort_args = {
575 'sort_dir': constants.SORT_DIRECTION_ASC,
576 'sort_key': 'name'
577 }
578 body = self.list_method(**sort_args)
579 expected_resources = self._extract_resources(body)
580 self.assertNotEmpty(expected_resources)
581
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200582 resources = lister(
583 len(expected_resources), sort_args
584 )
585
586 # finally, compare that the list retrieved in one go is identical to
587 # the one containing pagination results
588 self.assertSameOrder(expected_resources, resources)
589
590 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200591 # paginate resources one by one, using last fetched resource as a
592 # marker
593 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200594 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200595 pagination_args = sort_args.copy()
596 pagination_args['limit'] = 1
597 if resources:
598 pagination_args['marker'] = resources[-1]['id']
599 body = self.list_method(**pagination_args)
600 resources_ = self._extract_resources(body)
601 self.assertEqual(1, len(resources_))
602 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200603 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200604
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200605 @_require_pagination
606 @_require_sorting
607 def _test_list_pagination_with_marker(self):
608 self._test_list_pagination_iteratively(self._list_all_with_marker)
609
610 def _list_all_with_hrefs(self, niterations, sort_args):
611 # paginate resources one by one, using next href links
612 resources = []
613 prev_links = {}
614
615 for i in range(niterations):
616 if prev_links:
617 uri = self.get_bare_url(prev_links['next'])
618 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200619 sort_args.update(self.list_kwargs)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200620 uri = self.client.build_uri(
621 self.plural_name, limit=1, **sort_args)
622 prev_links, body = self.client.get_uri_with_links(
623 self.plural_name, uri
624 )
625 resources_ = self._extract_resources(body)
626 self.assertEqual(1, len(resources_))
627 resources.extend(resources_)
628
629 # The last element is empty and does not contain 'next' link
630 uri = self.get_bare_url(prev_links['next'])
631 prev_links, body = self.client.get_uri_with_links(
632 self.plural_name, uri
633 )
634 self.assertNotIn('next', prev_links)
635
636 # Now walk backwards and compare results
637 resources2 = []
638 for i in range(niterations):
639 uri = self.get_bare_url(prev_links['previous'])
640 prev_links, body = self.client.get_uri_with_links(
641 self.plural_name, uri
642 )
643 resources_ = self._extract_resources(body)
644 self.assertEqual(1, len(resources_))
645 resources2.extend(resources_)
646
647 self.assertSameOrder(resources, reversed(resources2))
648
649 return resources
650
651 @_require_pagination
652 @_require_sorting
653 def _test_list_pagination_with_href_links(self):
654 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
655
656 @_require_pagination
657 @_require_sorting
658 def _test_list_pagination_page_reverse_with_href_links(
659 self, direction=constants.SORT_DIRECTION_ASC):
660 pagination_args = {
661 'sort_dir': direction,
662 'sort_key': 'name',
663 }
664 body = self.list_method(**pagination_args)
665 expected_resources = self._extract_resources(body)
666
667 page_size = 2
668 pagination_args['limit'] = page_size
669
670 prev_links = {}
671 resources = []
672 num_resources = len(expected_resources)
673 niterations = int(math.ceil(float(num_resources) / page_size))
674 for i in range(niterations):
675 if prev_links:
676 uri = self.get_bare_url(prev_links['previous'])
677 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200678 pagination_args.update(self.list_kwargs)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200679 uri = self.client.build_uri(
680 self.plural_name, page_reverse=True, **pagination_args)
681 prev_links, body = self.client.get_uri_with_links(
682 self.plural_name, uri
683 )
684 resources_ = self._extract_resources(body)
685 self.assertTrue(page_size >= len(resources_))
686 resources.extend(reversed(resources_))
687
688 self.assertSameOrder(expected_resources, reversed(resources))
689
690 @_require_pagination
691 @_require_sorting
692 def _test_list_pagination_page_reverse_asc(self):
693 self._test_list_pagination_page_reverse(
694 direction=constants.SORT_DIRECTION_ASC)
695
696 @_require_pagination
697 @_require_sorting
698 def _test_list_pagination_page_reverse_desc(self):
699 self._test_list_pagination_page_reverse(
700 direction=constants.SORT_DIRECTION_DESC)
701
702 def _test_list_pagination_page_reverse(self, direction):
703 pagination_args = {
704 'sort_dir': direction,
705 'sort_key': 'name',
706 'limit': 3,
707 }
708 body = self.list_method(**pagination_args)
709 expected_resources = self._extract_resources(body)
710
711 pagination_args['limit'] -= 1
712 pagination_args['marker'] = expected_resources[-1]['id']
713 pagination_args['page_reverse'] = True
714 body = self.list_method(**pagination_args)
715
716 self.assertSameOrder(
717 # the last entry is not included in 2nd result when used as a
718 # marker
719 expected_resources[:-1],
720 self._extract_resources(body))