blob: 84d91dfd03873f53c7ce1791cdb724a79b2752d2 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
64 manager = test.BaseTestCase.get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new)
68 # Neutron uses a different clients manager than the one in the Tempest
69 return clients.Manager(manager.credentials)
70
71 @classmethod
72 def skip_checks(cls):
73 super(BaseNetworkTest, cls).skip_checks()
74 if not CONF.service_available.neutron:
75 raise cls.skipException("Neutron support is required")
76 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
77 raise cls.skipException("IPv6 Tests are disabled.")
78
79 @classmethod
80 def setup_credentials(cls):
81 # Create no network resources for these test.
82 cls.set_network_resources()
83 super(BaseNetworkTest, cls).setup_credentials()
84
85 @classmethod
86 def setup_clients(cls):
87 super(BaseNetworkTest, cls).setup_clients()
88 cls.client = cls.os.network_client
89
90 @classmethod
91 def resource_setup(cls):
92 super(BaseNetworkTest, cls).resource_setup()
93
94 cls.networks = []
95 cls.shared_networks = []
96 cls.subnets = []
97 cls.ports = []
98 cls.routers = []
99 cls.floating_ips = []
100 cls.metering_labels = []
101 cls.service_profiles = []
102 cls.flavors = []
103 cls.metering_label_rules = []
104 cls.qos_rules = []
105 cls.qos_policies = []
106 cls.ethertype = "IPv" + str(cls._ip_version)
107 cls.address_scopes = []
108 cls.admin_address_scopes = []
109 cls.subnetpools = []
110 cls.admin_subnetpools = []
111
112 @classmethod
113 def resource_cleanup(cls):
114 if CONF.service_available.neutron:
115 # Clean up QoS rules
116 for qos_rule in cls.qos_rules:
117 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
118 qos_rule['id'])
119 # Clean up QoS policies
120 for qos_policy in cls.qos_policies:
121 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
122 qos_policy['id'])
123 # Clean up floating IPs
124 for floating_ip in cls.floating_ips:
125 cls._try_delete_resource(cls.client.delete_floatingip,
126 floating_ip['id'])
127 # Clean up routers
128 for router in cls.routers:
129 cls._try_delete_resource(cls.delete_router,
130 router)
131 # Clean up metering label rules
132 for metering_label_rule in cls.metering_label_rules:
133 cls._try_delete_resource(
134 cls.admin_client.delete_metering_label_rule,
135 metering_label_rule['id'])
136 # Clean up metering labels
137 for metering_label in cls.metering_labels:
138 cls._try_delete_resource(
139 cls.admin_client.delete_metering_label,
140 metering_label['id'])
141 # Clean up flavors
142 for flavor in cls.flavors:
143 cls._try_delete_resource(
144 cls.admin_client.delete_flavor,
145 flavor['id'])
146 # Clean up service profiles
147 for service_profile in cls.service_profiles:
148 cls._try_delete_resource(
149 cls.admin_client.delete_service_profile,
150 service_profile['id'])
151 # Clean up ports
152 for port in cls.ports:
153 cls._try_delete_resource(cls.client.delete_port,
154 port['id'])
155 # Clean up subnets
156 for subnet in cls.subnets:
157 cls._try_delete_resource(cls.client.delete_subnet,
158 subnet['id'])
159 # Clean up networks
160 for network in cls.networks:
161 cls._try_delete_resource(cls.client.delete_network,
162 network['id'])
163
164 # Clean up shared networks
165 for network in cls.shared_networks:
166 cls._try_delete_resource(cls.admin_client.delete_network,
167 network['id'])
168
169 for subnetpool in cls.subnetpools:
170 cls._try_delete_resource(cls.client.delete_subnetpool,
171 subnetpool['id'])
172
173 for subnetpool in cls.admin_subnetpools:
174 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
175 subnetpool['id'])
176
177 for address_scope in cls.address_scopes:
178 cls._try_delete_resource(cls.client.delete_address_scope,
179 address_scope['id'])
180
181 for address_scope in cls.admin_address_scopes:
182 cls._try_delete_resource(
183 cls.admin_client.delete_address_scope,
184 address_scope['id'])
185
186 super(BaseNetworkTest, cls).resource_cleanup()
187
188 @classmethod
189 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
190 """Cleanup resources in case of test-failure
191
192 Some resources are explicitly deleted by the test.
193 If the test failed to delete a resource, this method will execute
194 the appropriate delete methods. Otherwise, the method ignores NotFound
195 exceptions thrown for resources that were correctly deleted by the
196 test.
197
198 :param delete_callable: delete method
199 :param args: arguments for delete method
200 :param kwargs: keyword arguments for delete method
201 """
202 try:
203 delete_callable(*args, **kwargs)
204 # if resource is not found, this means it was deleted in the test
205 except lib_exc.NotFound:
206 pass
207
208 @classmethod
209 def create_network(cls, network_name=None, **kwargs):
210 """Wrapper utility that returns a test network."""
211 network_name = network_name or data_utils.rand_name('test-network-')
212
213 body = cls.client.create_network(name=network_name, **kwargs)
214 network = body['network']
215 cls.networks.append(network)
216 return network
217
218 @classmethod
219 def create_shared_network(cls, network_name=None, **post_body):
220 network_name = network_name or data_utils.rand_name('sharednetwork-')
221 post_body.update({'name': network_name, 'shared': True})
222 body = cls.admin_client.create_network(**post_body)
223 network = body['network']
224 cls.shared_networks.append(network)
225 return network
226
227 @classmethod
228 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
229 ip_version=None, client=None, **kwargs):
230 """Wrapper utility that returns a test subnet."""
231
232 # allow tests to use admin client
233 if not client:
234 client = cls.client
235
236 # The cidr and mask_bits depend on the ip version.
237 ip_version = ip_version if ip_version is not None else cls._ip_version
238 gateway_not_set = gateway == ''
239 if ip_version == 4:
240 cidr = cidr or netaddr.IPNetwork(
241 config.safe_get_config_value(
242 'network', 'project_network_cidr'))
243 mask_bits = (
244 mask_bits or config.safe_get_config_value(
245 'network', 'project_network_mask_bits'))
246 elif ip_version == 6:
247 cidr = (
248 cidr or netaddr.IPNetwork(
249 config.safe_get_config_value(
250 'network', 'project_network_v6_cidr')))
251 mask_bits = (
252 mask_bits or config.safe_get_config_value(
253 'network', 'project_network_v6_mask_bits'))
254 # Find a cidr that is not in use yet and create a subnet with it
255 for subnet_cidr in cidr.subnet(mask_bits):
256 if gateway_not_set:
257 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
258 else:
259 gateway_ip = gateway
260 try:
261 body = client.create_subnet(
262 network_id=network['id'],
263 cidr=str(subnet_cidr),
264 ip_version=ip_version,
265 gateway_ip=gateway_ip,
266 **kwargs)
267 break
268 except lib_exc.BadRequest as e:
269 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
270 if not is_overlapping_cidr:
271 raise
272 else:
273 message = 'Available CIDR for subnet creation could not be found'
274 raise ValueError(message)
275 subnet = body['subnet']
276 cls.subnets.append(subnet)
277 return subnet
278
279 @classmethod
280 def create_port(cls, network, **kwargs):
281 """Wrapper utility that returns a test port."""
282 body = cls.client.create_port(network_id=network['id'],
283 **kwargs)
284 port = body['port']
285 cls.ports.append(port)
286 return port
287
288 @classmethod
289 def update_port(cls, port, **kwargs):
290 """Wrapper utility that updates a test port."""
291 body = cls.client.update_port(port['id'],
292 **kwargs)
293 return body['port']
294
295 @classmethod
296 def create_router(cls, router_name=None, admin_state_up=False,
297 external_network_id=None, enable_snat=None,
298 **kwargs):
299 ext_gw_info = {}
300 if external_network_id:
301 ext_gw_info['network_id'] = external_network_id
302 if enable_snat:
303 ext_gw_info['enable_snat'] = enable_snat
304 body = cls.client.create_router(
305 router_name, external_gateway_info=ext_gw_info,
306 admin_state_up=admin_state_up, **kwargs)
307 router = body['router']
308 cls.routers.append(router)
309 return router
310
311 @classmethod
312 def create_floatingip(cls, external_network_id):
313 """Wrapper utility that returns a test floating IP."""
314 body = cls.client.create_floatingip(
315 floating_network_id=external_network_id)
316 fip = body['floatingip']
317 cls.floating_ips.append(fip)
318 return fip
319
320 @classmethod
321 def create_router_interface(cls, router_id, subnet_id):
322 """Wrapper utility that returns a router interface."""
323 interface = cls.client.add_router_interface_with_subnet_id(
324 router_id, subnet_id)
325 return interface
326
327 @classmethod
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200328 def create_qos_policy(cls, name, description=None, shared=False,
329 tenant_id=None):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000330 """Wrapper utility that returns a test QoS policy."""
331 body = cls.admin_client.create_qos_policy(
332 name, description, shared, tenant_id)
333 qos_policy = body['policy']
334 cls.qos_policies.append(qos_policy)
335 return qos_policy
336
337 @classmethod
338 def create_qos_bandwidth_limit_rule(cls, policy_id,
339 max_kbps, max_burst_kbps):
340 """Wrapper utility that returns a test QoS bandwidth limit rule."""
341 body = cls.admin_client.create_bandwidth_limit_rule(
342 policy_id, max_kbps, max_burst_kbps)
343 qos_rule = body['bandwidth_limit_rule']
344 cls.qos_rules.append(qos_rule)
345 return qos_rule
346
347 @classmethod
348 def delete_router(cls, router):
349 body = cls.client.list_router_interfaces(router['id'])
350 interfaces = body['ports']
351 for i in interfaces:
352 try:
353 cls.client.remove_router_interface_with_subnet_id(
354 router['id'], i['fixed_ips'][0]['subnet_id'])
355 except lib_exc.NotFound:
356 pass
357 cls.client.delete_router(router['id'])
358
359 @classmethod
360 def create_address_scope(cls, name, is_admin=False, **kwargs):
361 if is_admin:
362 body = cls.admin_client.create_address_scope(name=name, **kwargs)
363 cls.admin_address_scopes.append(body['address_scope'])
364 else:
365 body = cls.client.create_address_scope(name=name, **kwargs)
366 cls.address_scopes.append(body['address_scope'])
367 return body['address_scope']
368
369 @classmethod
370 def create_subnetpool(cls, name, is_admin=False, **kwargs):
371 if is_admin:
372 body = cls.admin_client.create_subnetpool(name, **kwargs)
373 cls.admin_subnetpools.append(body['subnetpool'])
374 else:
375 body = cls.client.create_subnetpool(name, **kwargs)
376 cls.subnetpools.append(body['subnetpool'])
377 return body['subnetpool']
378
379
380class BaseAdminNetworkTest(BaseNetworkTest):
381
382 credentials = ['primary', 'admin']
383
384 @classmethod
385 def setup_clients(cls):
386 super(BaseAdminNetworkTest, cls).setup_clients()
387 cls.admin_client = cls.os_adm.network_client
388 cls.identity_admin_client = cls.os_adm.tenants_client
389
390 @classmethod
391 def create_metering_label(cls, name, description):
392 """Wrapper utility that returns a test metering label."""
393 body = cls.admin_client.create_metering_label(
394 description=description,
395 name=data_utils.rand_name("metering-label"))
396 metering_label = body['metering_label']
397 cls.metering_labels.append(metering_label)
398 return metering_label
399
400 @classmethod
401 def create_metering_label_rule(cls, remote_ip_prefix, direction,
402 metering_label_id):
403 """Wrapper utility that returns a test metering label rule."""
404 body = cls.admin_client.create_metering_label_rule(
405 remote_ip_prefix=remote_ip_prefix, direction=direction,
406 metering_label_id=metering_label_id)
407 metering_label_rule = body['metering_label_rule']
408 cls.metering_label_rules.append(metering_label_rule)
409 return metering_label_rule
410
411 @classmethod
412 def create_flavor(cls, name, description, service_type):
413 """Wrapper utility that returns a test flavor."""
414 body = cls.admin_client.create_flavor(
415 description=description, service_type=service_type,
416 name=name)
417 flavor = body['flavor']
418 cls.flavors.append(flavor)
419 return flavor
420
421 @classmethod
422 def create_service_profile(cls, description, metainfo, driver):
423 """Wrapper utility that returns a test service profile."""
424 body = cls.admin_client.create_service_profile(
425 driver=driver, metainfo=metainfo, description=description)
426 service_profile = body['service_profile']
427 cls.service_profiles.append(service_profile)
428 return service_profile
429
430 @classmethod
431 def get_unused_ip(cls, net_id, ip_version=None):
Gary Kotton011345f2016-06-15 08:04:31 -0700432 """Get an unused ip address in a allocation pool of net"""
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000433 body = cls.admin_client.list_ports(network_id=net_id)
434 ports = body['ports']
435 used_ips = []
436 for port in ports:
437 used_ips.extend(
438 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
439 body = cls.admin_client.list_subnets(network_id=net_id)
440 subnets = body['subnets']
441
442 for subnet in subnets:
443 if ip_version and subnet['ip_version'] != ip_version:
444 continue
445 cidr = subnet['cidr']
446 allocation_pools = subnet['allocation_pools']
447 iterators = []
448 if allocation_pools:
449 for allocation_pool in allocation_pools:
450 iterators.append(netaddr.iter_iprange(
451 allocation_pool['start'], allocation_pool['end']))
452 else:
453 net = netaddr.IPNetwork(cidr)
454
455 def _iterip():
456 for ip in net:
457 if ip not in (net.network, net.broadcast):
458 yield ip
459 iterators.append(iter(_iterip()))
460
461 for iterator in iterators:
462 for ip in iterator:
463 if str(ip) not in used_ips:
464 return str(ip)
465
466 message = (
467 "net(%s) has no usable IP address in allocation pools" % net_id)
468 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200469
470
471def _require_sorting(f):
472 @functools.wraps(f)
473 def inner(self, *args, **kwargs):
474 if not CONF.neutron_plugin_options.validate_sorting:
475 self.skipTest('Sorting feature is required')
476 return f(self, *args, **kwargs)
477 return inner
478
479
480def _require_pagination(f):
481 @functools.wraps(f)
482 def inner(self, *args, **kwargs):
483 if not CONF.neutron_plugin_options.validate_pagination:
484 self.skipTest('Pagination feature is required')
485 return f(self, *args, **kwargs)
486 return inner
487
488
489class BaseSearchCriteriaTest(BaseNetworkTest):
490
491 # This should be defined by subclasses to reflect resource name to test
492 resource = None
493
Armando Migliaccio57581c62016-07-01 10:13:19 -0700494 field = 'name'
495
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200496 # NOTE(ihrachys): some names, like those starting with an underscore (_)
497 # are sorted differently depending on whether the plugin implements native
498 # sorting support, or not. So we avoid any such cases here, sticking to
499 # alphanumeric. Also test a case when there are multiple resources with the
500 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200501 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
502
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200503 force_tenant_isolation = True
504
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200505 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200506
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200507 list_as_admin = False
508
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200509 def assertSameOrder(self, original, actual):
510 # gracefully handle iterators passed
511 original = list(original)
512 actual = list(actual)
513 self.assertEqual(len(original), len(actual))
514 for expected, res in zip(original, actual):
Armando Migliaccio57581c62016-07-01 10:13:19 -0700515 self.assertEqual(expected[self.field], res[self.field])
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200516
517 @utils.classproperty
518 def plural_name(self):
519 return '%ss' % self.resource
520
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200521 @property
522 def list_client(self):
523 return self.admin_client if self.list_as_admin else self.client
524
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200525 def list_method(self, *args, **kwargs):
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200526 method = getattr(self.list_client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200527 kwargs.update(self.list_kwargs)
528 return method(*args, **kwargs)
529
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200530 def get_bare_url(self, url):
531 base_url = self.client.base_url
532 self.assertTrue(url.startswith(base_url))
533 return url[len(base_url):]
534
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200535 @classmethod
536 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200537 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200538
539 def _test_list_sorts(self, direction):
540 sort_args = {
541 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700542 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200543 }
544 body = self.list_method(**sort_args)
545 resources = self._extract_resources(body)
546 self.assertNotEmpty(
547 resources, "%s list returned is empty" % self.resource)
Armando Migliaccio57581c62016-07-01 10:13:19 -0700548 retrieved_names = [res[self.field] for res in resources]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200549 expected = sorted(retrieved_names)
550 if direction == constants.SORT_DIRECTION_DESC:
551 expected = list(reversed(expected))
552 self.assertEqual(expected, retrieved_names)
553
554 @_require_sorting
555 def _test_list_sorts_asc(self):
556 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
557
558 @_require_sorting
559 def _test_list_sorts_desc(self):
560 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
561
562 @_require_pagination
563 def _test_list_pagination(self):
564 for limit in range(1, len(self.resource_names) + 1):
565 pagination_args = {
566 'limit': limit,
567 }
568 body = self.list_method(**pagination_args)
569 resources = self._extract_resources(body)
570 self.assertEqual(limit, len(resources))
571
572 @_require_pagination
573 def _test_list_no_pagination_limit_0(self):
574 pagination_args = {
575 'limit': 0,
576 }
577 body = self.list_method(**pagination_args)
578 resources = self._extract_resources(body)
579 self.assertTrue(len(resources) >= len(self.resource_names))
580
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200581 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200582 # first, collect all resources for later comparison
583 sort_args = {
584 'sort_dir': constants.SORT_DIRECTION_ASC,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700585 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200586 }
587 body = self.list_method(**sort_args)
588 expected_resources = self._extract_resources(body)
589 self.assertNotEmpty(expected_resources)
590
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200591 resources = lister(
592 len(expected_resources), sort_args
593 )
594
595 # finally, compare that the list retrieved in one go is identical to
596 # the one containing pagination results
597 self.assertSameOrder(expected_resources, resources)
598
599 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200600 # paginate resources one by one, using last fetched resource as a
601 # marker
602 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200603 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200604 pagination_args = sort_args.copy()
605 pagination_args['limit'] = 1
606 if resources:
607 pagination_args['marker'] = resources[-1]['id']
608 body = self.list_method(**pagination_args)
609 resources_ = self._extract_resources(body)
610 self.assertEqual(1, len(resources_))
611 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200612 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200613
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200614 @_require_pagination
615 @_require_sorting
616 def _test_list_pagination_with_marker(self):
617 self._test_list_pagination_iteratively(self._list_all_with_marker)
618
619 def _list_all_with_hrefs(self, niterations, sort_args):
620 # paginate resources one by one, using next href links
621 resources = []
622 prev_links = {}
623
624 for i in range(niterations):
625 if prev_links:
626 uri = self.get_bare_url(prev_links['next'])
627 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200628 sort_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200629 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200630 self.plural_name, limit=1, **sort_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200631 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200632 self.plural_name, uri
633 )
634 resources_ = self._extract_resources(body)
635 self.assertEqual(1, len(resources_))
636 resources.extend(resources_)
637
638 # The last element is empty and does not contain 'next' link
639 uri = self.get_bare_url(prev_links['next'])
640 prev_links, body = self.client.get_uri_with_links(
641 self.plural_name, uri
642 )
643 self.assertNotIn('next', prev_links)
644
645 # Now walk backwards and compare results
646 resources2 = []
647 for i in range(niterations):
648 uri = self.get_bare_url(prev_links['previous'])
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200649 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200650 self.plural_name, uri
651 )
652 resources_ = self._extract_resources(body)
653 self.assertEqual(1, len(resources_))
654 resources2.extend(resources_)
655
656 self.assertSameOrder(resources, reversed(resources2))
657
658 return resources
659
660 @_require_pagination
661 @_require_sorting
662 def _test_list_pagination_with_href_links(self):
663 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
664
665 @_require_pagination
666 @_require_sorting
667 def _test_list_pagination_page_reverse_with_href_links(
668 self, direction=constants.SORT_DIRECTION_ASC):
669 pagination_args = {
670 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700671 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200672 }
673 body = self.list_method(**pagination_args)
674 expected_resources = self._extract_resources(body)
675
676 page_size = 2
677 pagination_args['limit'] = page_size
678
679 prev_links = {}
680 resources = []
681 num_resources = len(expected_resources)
682 niterations = int(math.ceil(float(num_resources) / page_size))
683 for i in range(niterations):
684 if prev_links:
685 uri = self.get_bare_url(prev_links['previous'])
686 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200687 pagination_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200688 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200689 self.plural_name, page_reverse=True, **pagination_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200690 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200691 self.plural_name, uri
692 )
693 resources_ = self._extract_resources(body)
694 self.assertTrue(page_size >= len(resources_))
695 resources.extend(reversed(resources_))
696
697 self.assertSameOrder(expected_resources, reversed(resources))
698
699 @_require_pagination
700 @_require_sorting
701 def _test_list_pagination_page_reverse_asc(self):
702 self._test_list_pagination_page_reverse(
703 direction=constants.SORT_DIRECTION_ASC)
704
705 @_require_pagination
706 @_require_sorting
707 def _test_list_pagination_page_reverse_desc(self):
708 self._test_list_pagination_page_reverse(
709 direction=constants.SORT_DIRECTION_DESC)
710
711 def _test_list_pagination_page_reverse(self, direction):
712 pagination_args = {
713 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700714 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200715 'limit': 3,
716 }
717 body = self.list_method(**pagination_args)
718 expected_resources = self._extract_resources(body)
719
720 pagination_args['limit'] -= 1
721 pagination_args['marker'] = expected_resources[-1]['id']
722 pagination_args['page_reverse'] = True
723 body = self.list_method(**pagination_args)
724
725 self.assertSameOrder(
726 # the last entry is not included in 2nd result when used as a
727 # marker
728 expected_resources[:-1],
729 self._extract_resources(body))