blob: c4d3b4801abf0748d670e508fe49ee7914cb18f5 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
Genadi Chereshnyacc395c02016-07-25 12:17:37 +030064 manager = super(BaseNetworkTest, cls).get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new
68 )
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000069 # Neutron uses a different clients manager than the one in the Tempest
70 return clients.Manager(manager.credentials)
71
72 @classmethod
73 def skip_checks(cls):
74 super(BaseNetworkTest, cls).skip_checks()
75 if not CONF.service_available.neutron:
76 raise cls.skipException("Neutron support is required")
77 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
78 raise cls.skipException("IPv6 Tests are disabled.")
79
80 @classmethod
81 def setup_credentials(cls):
82 # Create no network resources for these test.
83 cls.set_network_resources()
84 super(BaseNetworkTest, cls).setup_credentials()
85
86 @classmethod
87 def setup_clients(cls):
88 super(BaseNetworkTest, cls).setup_clients()
89 cls.client = cls.os.network_client
90
91 @classmethod
92 def resource_setup(cls):
93 super(BaseNetworkTest, cls).resource_setup()
94
95 cls.networks = []
96 cls.shared_networks = []
97 cls.subnets = []
98 cls.ports = []
99 cls.routers = []
100 cls.floating_ips = []
101 cls.metering_labels = []
102 cls.service_profiles = []
103 cls.flavors = []
104 cls.metering_label_rules = []
105 cls.qos_rules = []
106 cls.qos_policies = []
107 cls.ethertype = "IPv" + str(cls._ip_version)
108 cls.address_scopes = []
109 cls.admin_address_scopes = []
110 cls.subnetpools = []
111 cls.admin_subnetpools = []
112
113 @classmethod
114 def resource_cleanup(cls):
115 if CONF.service_available.neutron:
116 # Clean up QoS rules
117 for qos_rule in cls.qos_rules:
118 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
119 qos_rule['id'])
120 # Clean up QoS policies
121 for qos_policy in cls.qos_policies:
122 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
123 qos_policy['id'])
124 # Clean up floating IPs
125 for floating_ip in cls.floating_ips:
126 cls._try_delete_resource(cls.client.delete_floatingip,
127 floating_ip['id'])
128 # Clean up routers
129 for router in cls.routers:
130 cls._try_delete_resource(cls.delete_router,
131 router)
132 # Clean up metering label rules
133 for metering_label_rule in cls.metering_label_rules:
134 cls._try_delete_resource(
135 cls.admin_client.delete_metering_label_rule,
136 metering_label_rule['id'])
137 # Clean up metering labels
138 for metering_label in cls.metering_labels:
139 cls._try_delete_resource(
140 cls.admin_client.delete_metering_label,
141 metering_label['id'])
142 # Clean up flavors
143 for flavor in cls.flavors:
144 cls._try_delete_resource(
145 cls.admin_client.delete_flavor,
146 flavor['id'])
147 # Clean up service profiles
148 for service_profile in cls.service_profiles:
149 cls._try_delete_resource(
150 cls.admin_client.delete_service_profile,
151 service_profile['id'])
152 # Clean up ports
153 for port in cls.ports:
154 cls._try_delete_resource(cls.client.delete_port,
155 port['id'])
156 # Clean up subnets
157 for subnet in cls.subnets:
158 cls._try_delete_resource(cls.client.delete_subnet,
159 subnet['id'])
160 # Clean up networks
161 for network in cls.networks:
162 cls._try_delete_resource(cls.client.delete_network,
163 network['id'])
164
165 # Clean up shared networks
166 for network in cls.shared_networks:
167 cls._try_delete_resource(cls.admin_client.delete_network,
168 network['id'])
169
170 for subnetpool in cls.subnetpools:
171 cls._try_delete_resource(cls.client.delete_subnetpool,
172 subnetpool['id'])
173
174 for subnetpool in cls.admin_subnetpools:
175 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
176 subnetpool['id'])
177
178 for address_scope in cls.address_scopes:
179 cls._try_delete_resource(cls.client.delete_address_scope,
180 address_scope['id'])
181
182 for address_scope in cls.admin_address_scopes:
183 cls._try_delete_resource(
184 cls.admin_client.delete_address_scope,
185 address_scope['id'])
186
187 super(BaseNetworkTest, cls).resource_cleanup()
188
189 @classmethod
190 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
191 """Cleanup resources in case of test-failure
192
193 Some resources are explicitly deleted by the test.
194 If the test failed to delete a resource, this method will execute
195 the appropriate delete methods. Otherwise, the method ignores NotFound
196 exceptions thrown for resources that were correctly deleted by the
197 test.
198
199 :param delete_callable: delete method
200 :param args: arguments for delete method
201 :param kwargs: keyword arguments for delete method
202 """
203 try:
204 delete_callable(*args, **kwargs)
205 # if resource is not found, this means it was deleted in the test
206 except lib_exc.NotFound:
207 pass
208
209 @classmethod
210 def create_network(cls, network_name=None, **kwargs):
211 """Wrapper utility that returns a test network."""
212 network_name = network_name or data_utils.rand_name('test-network-')
213
214 body = cls.client.create_network(name=network_name, **kwargs)
215 network = body['network']
216 cls.networks.append(network)
217 return network
218
219 @classmethod
220 def create_shared_network(cls, network_name=None, **post_body):
221 network_name = network_name or data_utils.rand_name('sharednetwork-')
222 post_body.update({'name': network_name, 'shared': True})
223 body = cls.admin_client.create_network(**post_body)
224 network = body['network']
225 cls.shared_networks.append(network)
226 return network
227
228 @classmethod
229 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
230 ip_version=None, client=None, **kwargs):
231 """Wrapper utility that returns a test subnet."""
232
233 # allow tests to use admin client
234 if not client:
235 client = cls.client
236
237 # The cidr and mask_bits depend on the ip version.
238 ip_version = ip_version if ip_version is not None else cls._ip_version
239 gateway_not_set = gateway == ''
240 if ip_version == 4:
241 cidr = cidr or netaddr.IPNetwork(
242 config.safe_get_config_value(
243 'network', 'project_network_cidr'))
244 mask_bits = (
245 mask_bits or config.safe_get_config_value(
246 'network', 'project_network_mask_bits'))
247 elif ip_version == 6:
248 cidr = (
249 cidr or netaddr.IPNetwork(
250 config.safe_get_config_value(
251 'network', 'project_network_v6_cidr')))
252 mask_bits = (
253 mask_bits or config.safe_get_config_value(
254 'network', 'project_network_v6_mask_bits'))
255 # Find a cidr that is not in use yet and create a subnet with it
256 for subnet_cidr in cidr.subnet(mask_bits):
257 if gateway_not_set:
258 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
259 else:
260 gateway_ip = gateway
261 try:
262 body = client.create_subnet(
263 network_id=network['id'],
264 cidr=str(subnet_cidr),
265 ip_version=ip_version,
266 gateway_ip=gateway_ip,
267 **kwargs)
268 break
269 except lib_exc.BadRequest as e:
270 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
271 if not is_overlapping_cidr:
272 raise
273 else:
274 message = 'Available CIDR for subnet creation could not be found'
275 raise ValueError(message)
276 subnet = body['subnet']
277 cls.subnets.append(subnet)
278 return subnet
279
280 @classmethod
281 def create_port(cls, network, **kwargs):
282 """Wrapper utility that returns a test port."""
283 body = cls.client.create_port(network_id=network['id'],
284 **kwargs)
285 port = body['port']
286 cls.ports.append(port)
287 return port
288
289 @classmethod
290 def update_port(cls, port, **kwargs):
291 """Wrapper utility that updates a test port."""
292 body = cls.client.update_port(port['id'],
293 **kwargs)
294 return body['port']
295
296 @classmethod
297 def create_router(cls, router_name=None, admin_state_up=False,
298 external_network_id=None, enable_snat=None,
299 **kwargs):
300 ext_gw_info = {}
301 if external_network_id:
302 ext_gw_info['network_id'] = external_network_id
303 if enable_snat:
304 ext_gw_info['enable_snat'] = enable_snat
305 body = cls.client.create_router(
306 router_name, external_gateway_info=ext_gw_info,
307 admin_state_up=admin_state_up, **kwargs)
308 router = body['router']
309 cls.routers.append(router)
310 return router
311
312 @classmethod
313 def create_floatingip(cls, external_network_id):
314 """Wrapper utility that returns a test floating IP."""
315 body = cls.client.create_floatingip(
316 floating_network_id=external_network_id)
317 fip = body['floatingip']
318 cls.floating_ips.append(fip)
319 return fip
320
321 @classmethod
322 def create_router_interface(cls, router_id, subnet_id):
323 """Wrapper utility that returns a router interface."""
324 interface = cls.client.add_router_interface_with_subnet_id(
325 router_id, subnet_id)
326 return interface
327
328 @classmethod
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200329 def create_qos_policy(cls, name, description=None, shared=False,
330 tenant_id=None):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000331 """Wrapper utility that returns a test QoS policy."""
332 body = cls.admin_client.create_qos_policy(
333 name, description, shared, tenant_id)
334 qos_policy = body['policy']
335 cls.qos_policies.append(qos_policy)
336 return qos_policy
337
338 @classmethod
339 def create_qos_bandwidth_limit_rule(cls, policy_id,
340 max_kbps, max_burst_kbps):
341 """Wrapper utility that returns a test QoS bandwidth limit rule."""
342 body = cls.admin_client.create_bandwidth_limit_rule(
343 policy_id, max_kbps, max_burst_kbps)
344 qos_rule = body['bandwidth_limit_rule']
345 cls.qos_rules.append(qos_rule)
346 return qos_rule
347
348 @classmethod
349 def delete_router(cls, router):
350 body = cls.client.list_router_interfaces(router['id'])
351 interfaces = body['ports']
352 for i in interfaces:
353 try:
354 cls.client.remove_router_interface_with_subnet_id(
355 router['id'], i['fixed_ips'][0]['subnet_id'])
356 except lib_exc.NotFound:
357 pass
358 cls.client.delete_router(router['id'])
359
360 @classmethod
361 def create_address_scope(cls, name, is_admin=False, **kwargs):
362 if is_admin:
363 body = cls.admin_client.create_address_scope(name=name, **kwargs)
364 cls.admin_address_scopes.append(body['address_scope'])
365 else:
366 body = cls.client.create_address_scope(name=name, **kwargs)
367 cls.address_scopes.append(body['address_scope'])
368 return body['address_scope']
369
370 @classmethod
371 def create_subnetpool(cls, name, is_admin=False, **kwargs):
372 if is_admin:
373 body = cls.admin_client.create_subnetpool(name, **kwargs)
374 cls.admin_subnetpools.append(body['subnetpool'])
375 else:
376 body = cls.client.create_subnetpool(name, **kwargs)
377 cls.subnetpools.append(body['subnetpool'])
378 return body['subnetpool']
379
380
381class BaseAdminNetworkTest(BaseNetworkTest):
382
383 credentials = ['primary', 'admin']
384
385 @classmethod
386 def setup_clients(cls):
387 super(BaseAdminNetworkTest, cls).setup_clients()
388 cls.admin_client = cls.os_adm.network_client
389 cls.identity_admin_client = cls.os_adm.tenants_client
390
391 @classmethod
392 def create_metering_label(cls, name, description):
393 """Wrapper utility that returns a test metering label."""
394 body = cls.admin_client.create_metering_label(
395 description=description,
396 name=data_utils.rand_name("metering-label"))
397 metering_label = body['metering_label']
398 cls.metering_labels.append(metering_label)
399 return metering_label
400
401 @classmethod
402 def create_metering_label_rule(cls, remote_ip_prefix, direction,
403 metering_label_id):
404 """Wrapper utility that returns a test metering label rule."""
405 body = cls.admin_client.create_metering_label_rule(
406 remote_ip_prefix=remote_ip_prefix, direction=direction,
407 metering_label_id=metering_label_id)
408 metering_label_rule = body['metering_label_rule']
409 cls.metering_label_rules.append(metering_label_rule)
410 return metering_label_rule
411
412 @classmethod
413 def create_flavor(cls, name, description, service_type):
414 """Wrapper utility that returns a test flavor."""
415 body = cls.admin_client.create_flavor(
416 description=description, service_type=service_type,
417 name=name)
418 flavor = body['flavor']
419 cls.flavors.append(flavor)
420 return flavor
421
422 @classmethod
423 def create_service_profile(cls, description, metainfo, driver):
424 """Wrapper utility that returns a test service profile."""
425 body = cls.admin_client.create_service_profile(
426 driver=driver, metainfo=metainfo, description=description)
427 service_profile = body['service_profile']
428 cls.service_profiles.append(service_profile)
429 return service_profile
430
431 @classmethod
432 def get_unused_ip(cls, net_id, ip_version=None):
Gary Kotton011345f2016-06-15 08:04:31 -0700433 """Get an unused ip address in a allocation pool of net"""
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000434 body = cls.admin_client.list_ports(network_id=net_id)
435 ports = body['ports']
436 used_ips = []
437 for port in ports:
438 used_ips.extend(
439 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
440 body = cls.admin_client.list_subnets(network_id=net_id)
441 subnets = body['subnets']
442
443 for subnet in subnets:
444 if ip_version and subnet['ip_version'] != ip_version:
445 continue
446 cidr = subnet['cidr']
447 allocation_pools = subnet['allocation_pools']
448 iterators = []
449 if allocation_pools:
450 for allocation_pool in allocation_pools:
451 iterators.append(netaddr.iter_iprange(
452 allocation_pool['start'], allocation_pool['end']))
453 else:
454 net = netaddr.IPNetwork(cidr)
455
456 def _iterip():
457 for ip in net:
458 if ip not in (net.network, net.broadcast):
459 yield ip
460 iterators.append(iter(_iterip()))
461
462 for iterator in iterators:
463 for ip in iterator:
464 if str(ip) not in used_ips:
465 return str(ip)
466
467 message = (
468 "net(%s) has no usable IP address in allocation pools" % net_id)
469 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200470
471
472def _require_sorting(f):
473 @functools.wraps(f)
474 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200475 if not test.is_extension_enabled("sorting", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200476 self.skipTest('Sorting feature is required')
477 return f(self, *args, **kwargs)
478 return inner
479
480
481def _require_pagination(f):
482 @functools.wraps(f)
483 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200484 if not test.is_extension_enabled("pagination", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200485 self.skipTest('Pagination feature is required')
486 return f(self, *args, **kwargs)
487 return inner
488
489
490class BaseSearchCriteriaTest(BaseNetworkTest):
491
492 # This should be defined by subclasses to reflect resource name to test
493 resource = None
494
Armando Migliaccio57581c62016-07-01 10:13:19 -0700495 field = 'name'
496
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200497 # NOTE(ihrachys): some names, like those starting with an underscore (_)
498 # are sorted differently depending on whether the plugin implements native
499 # sorting support, or not. So we avoid any such cases here, sticking to
500 # alphanumeric. Also test a case when there are multiple resources with the
501 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200502 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
503
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200504 force_tenant_isolation = True
505
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200506 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200507
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200508 list_as_admin = False
509
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200510 def assertSameOrder(self, original, actual):
511 # gracefully handle iterators passed
512 original = list(original)
513 actual = list(actual)
514 self.assertEqual(len(original), len(actual))
515 for expected, res in zip(original, actual):
Armando Migliaccio57581c62016-07-01 10:13:19 -0700516 self.assertEqual(expected[self.field], res[self.field])
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200517
518 @utils.classproperty
519 def plural_name(self):
520 return '%ss' % self.resource
521
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200522 @property
523 def list_client(self):
524 return self.admin_client if self.list_as_admin else self.client
525
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200526 def list_method(self, *args, **kwargs):
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200527 method = getattr(self.list_client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200528 kwargs.update(self.list_kwargs)
529 return method(*args, **kwargs)
530
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200531 def get_bare_url(self, url):
532 base_url = self.client.base_url
533 self.assertTrue(url.startswith(base_url))
534 return url[len(base_url):]
535
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200536 @classmethod
537 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200538 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200539
540 def _test_list_sorts(self, direction):
541 sort_args = {
542 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700543 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200544 }
545 body = self.list_method(**sort_args)
546 resources = self._extract_resources(body)
547 self.assertNotEmpty(
548 resources, "%s list returned is empty" % self.resource)
Armando Migliaccio57581c62016-07-01 10:13:19 -0700549 retrieved_names = [res[self.field] for res in resources]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200550 expected = sorted(retrieved_names)
551 if direction == constants.SORT_DIRECTION_DESC:
552 expected = list(reversed(expected))
553 self.assertEqual(expected, retrieved_names)
554
555 @_require_sorting
556 def _test_list_sorts_asc(self):
557 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
558
559 @_require_sorting
560 def _test_list_sorts_desc(self):
561 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
562
563 @_require_pagination
564 def _test_list_pagination(self):
565 for limit in range(1, len(self.resource_names) + 1):
566 pagination_args = {
567 'limit': limit,
568 }
569 body = self.list_method(**pagination_args)
570 resources = self._extract_resources(body)
571 self.assertEqual(limit, len(resources))
572
573 @_require_pagination
574 def _test_list_no_pagination_limit_0(self):
575 pagination_args = {
576 'limit': 0,
577 }
578 body = self.list_method(**pagination_args)
579 resources = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200580 self.assertGreaterEqual(len(resources), len(self.resource_names))
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200581
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200582 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200583 # first, collect all resources for later comparison
584 sort_args = {
585 'sort_dir': constants.SORT_DIRECTION_ASC,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700586 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200587 }
588 body = self.list_method(**sort_args)
589 expected_resources = self._extract_resources(body)
590 self.assertNotEmpty(expected_resources)
591
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200592 resources = lister(
593 len(expected_resources), sort_args
594 )
595
596 # finally, compare that the list retrieved in one go is identical to
597 # the one containing pagination results
598 self.assertSameOrder(expected_resources, resources)
599
600 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200601 # paginate resources one by one, using last fetched resource as a
602 # marker
603 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200604 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200605 pagination_args = sort_args.copy()
606 pagination_args['limit'] = 1
607 if resources:
608 pagination_args['marker'] = resources[-1]['id']
609 body = self.list_method(**pagination_args)
610 resources_ = self._extract_resources(body)
611 self.assertEqual(1, len(resources_))
612 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200613 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200614
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200615 @_require_pagination
616 @_require_sorting
617 def _test_list_pagination_with_marker(self):
618 self._test_list_pagination_iteratively(self._list_all_with_marker)
619
620 def _list_all_with_hrefs(self, niterations, sort_args):
621 # paginate resources one by one, using next href links
622 resources = []
623 prev_links = {}
624
625 for i in range(niterations):
626 if prev_links:
627 uri = self.get_bare_url(prev_links['next'])
628 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200629 sort_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200630 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200631 self.plural_name, limit=1, **sort_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200632 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200633 self.plural_name, uri
634 )
635 resources_ = self._extract_resources(body)
636 self.assertEqual(1, len(resources_))
637 resources.extend(resources_)
638
639 # The last element is empty and does not contain 'next' link
640 uri = self.get_bare_url(prev_links['next'])
641 prev_links, body = self.client.get_uri_with_links(
642 self.plural_name, uri
643 )
644 self.assertNotIn('next', prev_links)
645
646 # Now walk backwards and compare results
647 resources2 = []
648 for i in range(niterations):
649 uri = self.get_bare_url(prev_links['previous'])
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200650 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200651 self.plural_name, uri
652 )
653 resources_ = self._extract_resources(body)
654 self.assertEqual(1, len(resources_))
655 resources2.extend(resources_)
656
657 self.assertSameOrder(resources, reversed(resources2))
658
659 return resources
660
661 @_require_pagination
662 @_require_sorting
663 def _test_list_pagination_with_href_links(self):
664 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
665
666 @_require_pagination
667 @_require_sorting
668 def _test_list_pagination_page_reverse_with_href_links(
669 self, direction=constants.SORT_DIRECTION_ASC):
670 pagination_args = {
671 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700672 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200673 }
674 body = self.list_method(**pagination_args)
675 expected_resources = self._extract_resources(body)
676
677 page_size = 2
678 pagination_args['limit'] = page_size
679
680 prev_links = {}
681 resources = []
682 num_resources = len(expected_resources)
683 niterations = int(math.ceil(float(num_resources) / page_size))
684 for i in range(niterations):
685 if prev_links:
686 uri = self.get_bare_url(prev_links['previous'])
687 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200688 pagination_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200689 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200690 self.plural_name, page_reverse=True, **pagination_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200691 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200692 self.plural_name, uri
693 )
694 resources_ = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200695 self.assertGreaterEqual(page_size, len(resources_))
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200696 resources.extend(reversed(resources_))
697
698 self.assertSameOrder(expected_resources, reversed(resources))
699
700 @_require_pagination
701 @_require_sorting
702 def _test_list_pagination_page_reverse_asc(self):
703 self._test_list_pagination_page_reverse(
704 direction=constants.SORT_DIRECTION_ASC)
705
706 @_require_pagination
707 @_require_sorting
708 def _test_list_pagination_page_reverse_desc(self):
709 self._test_list_pagination_page_reverse(
710 direction=constants.SORT_DIRECTION_DESC)
711
712 def _test_list_pagination_page_reverse(self, direction):
713 pagination_args = {
714 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700715 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200716 'limit': 3,
717 }
718 body = self.list_method(**pagination_args)
719 expected_resources = self._extract_resources(body)
720
721 pagination_args['limit'] -= 1
722 pagination_args['marker'] = expected_resources[-1]['id']
723 pagination_args['page_reverse'] = True
724 body = self.list_method(**pagination_args)
725
726 self.assertSameOrder(
727 # the last entry is not included in 2nd result when used as a
728 # marker
729 expected_resources[:-1],
730 self._extract_resources(body))