blob: b308a313e0ba1d152fc46344560ee251a0875a89 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
Genadi Chereshnyacc395c02016-07-25 12:17:37 +030064 manager = super(BaseNetworkTest, cls).get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new
68 )
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000069 # Neutron uses a different clients manager than the one in the Tempest
70 return clients.Manager(manager.credentials)
71
72 @classmethod
73 def skip_checks(cls):
74 super(BaseNetworkTest, cls).skip_checks()
75 if not CONF.service_available.neutron:
76 raise cls.skipException("Neutron support is required")
77 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
78 raise cls.skipException("IPv6 Tests are disabled.")
79
80 @classmethod
81 def setup_credentials(cls):
82 # Create no network resources for these test.
83 cls.set_network_resources()
84 super(BaseNetworkTest, cls).setup_credentials()
85
86 @classmethod
87 def setup_clients(cls):
88 super(BaseNetworkTest, cls).setup_clients()
89 cls.client = cls.os.network_client
90
91 @classmethod
92 def resource_setup(cls):
93 super(BaseNetworkTest, cls).resource_setup()
94
95 cls.networks = []
Miguel Lavalle124378b2016-09-21 16:41:47 -050096 cls.admin_networks = []
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000097 cls.subnets = []
98 cls.ports = []
99 cls.routers = []
100 cls.floating_ips = []
101 cls.metering_labels = []
102 cls.service_profiles = []
103 cls.flavors = []
104 cls.metering_label_rules = []
105 cls.qos_rules = []
106 cls.qos_policies = []
107 cls.ethertype = "IPv" + str(cls._ip_version)
108 cls.address_scopes = []
109 cls.admin_address_scopes = []
110 cls.subnetpools = []
111 cls.admin_subnetpools = []
112
113 @classmethod
114 def resource_cleanup(cls):
115 if CONF.service_available.neutron:
116 # Clean up QoS rules
117 for qos_rule in cls.qos_rules:
118 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
119 qos_rule['id'])
120 # Clean up QoS policies
121 for qos_policy in cls.qos_policies:
122 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
123 qos_policy['id'])
124 # Clean up floating IPs
125 for floating_ip in cls.floating_ips:
126 cls._try_delete_resource(cls.client.delete_floatingip,
127 floating_ip['id'])
128 # Clean up routers
129 for router in cls.routers:
130 cls._try_delete_resource(cls.delete_router,
131 router)
132 # Clean up metering label rules
133 for metering_label_rule in cls.metering_label_rules:
134 cls._try_delete_resource(
135 cls.admin_client.delete_metering_label_rule,
136 metering_label_rule['id'])
137 # Clean up metering labels
138 for metering_label in cls.metering_labels:
139 cls._try_delete_resource(
140 cls.admin_client.delete_metering_label,
141 metering_label['id'])
142 # Clean up flavors
143 for flavor in cls.flavors:
144 cls._try_delete_resource(
145 cls.admin_client.delete_flavor,
146 flavor['id'])
147 # Clean up service profiles
148 for service_profile in cls.service_profiles:
149 cls._try_delete_resource(
150 cls.admin_client.delete_service_profile,
151 service_profile['id'])
152 # Clean up ports
153 for port in cls.ports:
154 cls._try_delete_resource(cls.client.delete_port,
155 port['id'])
156 # Clean up subnets
157 for subnet in cls.subnets:
158 cls._try_delete_resource(cls.client.delete_subnet,
159 subnet['id'])
160 # Clean up networks
161 for network in cls.networks:
162 cls._try_delete_resource(cls.client.delete_network,
163 network['id'])
164
Miguel Lavalle124378b2016-09-21 16:41:47 -0500165 # Clean up admin networks
166 for network in cls.admin_networks:
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000167 cls._try_delete_resource(cls.admin_client.delete_network,
168 network['id'])
169
170 for subnetpool in cls.subnetpools:
171 cls._try_delete_resource(cls.client.delete_subnetpool,
172 subnetpool['id'])
173
174 for subnetpool in cls.admin_subnetpools:
175 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
176 subnetpool['id'])
177
178 for address_scope in cls.address_scopes:
179 cls._try_delete_resource(cls.client.delete_address_scope,
180 address_scope['id'])
181
182 for address_scope in cls.admin_address_scopes:
183 cls._try_delete_resource(
184 cls.admin_client.delete_address_scope,
185 address_scope['id'])
186
187 super(BaseNetworkTest, cls).resource_cleanup()
188
189 @classmethod
190 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
191 """Cleanup resources in case of test-failure
192
193 Some resources are explicitly deleted by the test.
194 If the test failed to delete a resource, this method will execute
195 the appropriate delete methods. Otherwise, the method ignores NotFound
196 exceptions thrown for resources that were correctly deleted by the
197 test.
198
199 :param delete_callable: delete method
200 :param args: arguments for delete method
201 :param kwargs: keyword arguments for delete method
202 """
203 try:
204 delete_callable(*args, **kwargs)
205 # if resource is not found, this means it was deleted in the test
206 except lib_exc.NotFound:
207 pass
208
209 @classmethod
210 def create_network(cls, network_name=None, **kwargs):
211 """Wrapper utility that returns a test network."""
212 network_name = network_name or data_utils.rand_name('test-network-')
213
214 body = cls.client.create_network(name=network_name, **kwargs)
215 network = body['network']
216 cls.networks.append(network)
217 return network
218
219 @classmethod
220 def create_shared_network(cls, network_name=None, **post_body):
221 network_name = network_name or data_utils.rand_name('sharednetwork-')
222 post_body.update({'name': network_name, 'shared': True})
223 body = cls.admin_client.create_network(**post_body)
224 network = body['network']
Miguel Lavalle124378b2016-09-21 16:41:47 -0500225 cls.admin_networks.append(network)
226 return network
227
228 @classmethod
229 def create_network_keystone_v3(cls, network_name=None, project_id=None,
230 tenant_id=None, client=None):
231 """Wrapper utility that creates a test network with project_id."""
232 client = client or cls.client
233 network_name = network_name or data_utils.rand_name(
234 'test-network-with-project_id')
235 project_id = cls.client.tenant_id
236 body = client.create_network_keystone_v3(network_name, project_id,
237 tenant_id)
238 network = body['network']
239 if client is cls.client:
240 cls.networks.append(network)
241 else:
242 cls.admin_networks.append(network)
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000243 return network
244
245 @classmethod
246 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
247 ip_version=None, client=None, **kwargs):
248 """Wrapper utility that returns a test subnet."""
249
250 # allow tests to use admin client
251 if not client:
252 client = cls.client
253
254 # The cidr and mask_bits depend on the ip version.
255 ip_version = ip_version if ip_version is not None else cls._ip_version
256 gateway_not_set = gateway == ''
257 if ip_version == 4:
258 cidr = cidr or netaddr.IPNetwork(
259 config.safe_get_config_value(
260 'network', 'project_network_cidr'))
261 mask_bits = (
262 mask_bits or config.safe_get_config_value(
263 'network', 'project_network_mask_bits'))
264 elif ip_version == 6:
265 cidr = (
266 cidr or netaddr.IPNetwork(
267 config.safe_get_config_value(
268 'network', 'project_network_v6_cidr')))
269 mask_bits = (
270 mask_bits or config.safe_get_config_value(
271 'network', 'project_network_v6_mask_bits'))
272 # Find a cidr that is not in use yet and create a subnet with it
273 for subnet_cidr in cidr.subnet(mask_bits):
274 if gateway_not_set:
275 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
276 else:
277 gateway_ip = gateway
278 try:
279 body = client.create_subnet(
280 network_id=network['id'],
281 cidr=str(subnet_cidr),
282 ip_version=ip_version,
283 gateway_ip=gateway_ip,
284 **kwargs)
285 break
286 except lib_exc.BadRequest as e:
287 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
288 if not is_overlapping_cidr:
289 raise
290 else:
291 message = 'Available CIDR for subnet creation could not be found'
292 raise ValueError(message)
293 subnet = body['subnet']
294 cls.subnets.append(subnet)
295 return subnet
296
297 @classmethod
298 def create_port(cls, network, **kwargs):
299 """Wrapper utility that returns a test port."""
300 body = cls.client.create_port(network_id=network['id'],
301 **kwargs)
302 port = body['port']
303 cls.ports.append(port)
304 return port
305
306 @classmethod
307 def update_port(cls, port, **kwargs):
308 """Wrapper utility that updates a test port."""
309 body = cls.client.update_port(port['id'],
310 **kwargs)
311 return body['port']
312
313 @classmethod
314 def create_router(cls, router_name=None, admin_state_up=False,
315 external_network_id=None, enable_snat=None,
316 **kwargs):
317 ext_gw_info = {}
318 if external_network_id:
319 ext_gw_info['network_id'] = external_network_id
320 if enable_snat:
321 ext_gw_info['enable_snat'] = enable_snat
322 body = cls.client.create_router(
323 router_name, external_gateway_info=ext_gw_info,
324 admin_state_up=admin_state_up, **kwargs)
325 router = body['router']
326 cls.routers.append(router)
327 return router
328
329 @classmethod
330 def create_floatingip(cls, external_network_id):
331 """Wrapper utility that returns a test floating IP."""
332 body = cls.client.create_floatingip(
333 floating_network_id=external_network_id)
334 fip = body['floatingip']
335 cls.floating_ips.append(fip)
336 return fip
337
338 @classmethod
339 def create_router_interface(cls, router_id, subnet_id):
340 """Wrapper utility that returns a router interface."""
341 interface = cls.client.add_router_interface_with_subnet_id(
342 router_id, subnet_id)
343 return interface
344
345 @classmethod
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200346 def create_qos_policy(cls, name, description=None, shared=False,
347 tenant_id=None):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000348 """Wrapper utility that returns a test QoS policy."""
349 body = cls.admin_client.create_qos_policy(
350 name, description, shared, tenant_id)
351 qos_policy = body['policy']
352 cls.qos_policies.append(qos_policy)
353 return qos_policy
354
355 @classmethod
356 def create_qos_bandwidth_limit_rule(cls, policy_id,
357 max_kbps, max_burst_kbps):
358 """Wrapper utility that returns a test QoS bandwidth limit rule."""
359 body = cls.admin_client.create_bandwidth_limit_rule(
360 policy_id, max_kbps, max_burst_kbps)
361 qos_rule = body['bandwidth_limit_rule']
362 cls.qos_rules.append(qos_rule)
363 return qos_rule
364
365 @classmethod
366 def delete_router(cls, router):
367 body = cls.client.list_router_interfaces(router['id'])
368 interfaces = body['ports']
369 for i in interfaces:
370 try:
371 cls.client.remove_router_interface_with_subnet_id(
372 router['id'], i['fixed_ips'][0]['subnet_id'])
373 except lib_exc.NotFound:
374 pass
375 cls.client.delete_router(router['id'])
376
377 @classmethod
378 def create_address_scope(cls, name, is_admin=False, **kwargs):
379 if is_admin:
380 body = cls.admin_client.create_address_scope(name=name, **kwargs)
381 cls.admin_address_scopes.append(body['address_scope'])
382 else:
383 body = cls.client.create_address_scope(name=name, **kwargs)
384 cls.address_scopes.append(body['address_scope'])
385 return body['address_scope']
386
387 @classmethod
388 def create_subnetpool(cls, name, is_admin=False, **kwargs):
389 if is_admin:
390 body = cls.admin_client.create_subnetpool(name, **kwargs)
391 cls.admin_subnetpools.append(body['subnetpool'])
392 else:
393 body = cls.client.create_subnetpool(name, **kwargs)
394 cls.subnetpools.append(body['subnetpool'])
395 return body['subnetpool']
396
397
398class BaseAdminNetworkTest(BaseNetworkTest):
399
400 credentials = ['primary', 'admin']
401
402 @classmethod
403 def setup_clients(cls):
404 super(BaseAdminNetworkTest, cls).setup_clients()
405 cls.admin_client = cls.os_adm.network_client
406 cls.identity_admin_client = cls.os_adm.tenants_client
407
408 @classmethod
409 def create_metering_label(cls, name, description):
410 """Wrapper utility that returns a test metering label."""
411 body = cls.admin_client.create_metering_label(
412 description=description,
413 name=data_utils.rand_name("metering-label"))
414 metering_label = body['metering_label']
415 cls.metering_labels.append(metering_label)
416 return metering_label
417
418 @classmethod
419 def create_metering_label_rule(cls, remote_ip_prefix, direction,
420 metering_label_id):
421 """Wrapper utility that returns a test metering label rule."""
422 body = cls.admin_client.create_metering_label_rule(
423 remote_ip_prefix=remote_ip_prefix, direction=direction,
424 metering_label_id=metering_label_id)
425 metering_label_rule = body['metering_label_rule']
426 cls.metering_label_rules.append(metering_label_rule)
427 return metering_label_rule
428
429 @classmethod
430 def create_flavor(cls, name, description, service_type):
431 """Wrapper utility that returns a test flavor."""
432 body = cls.admin_client.create_flavor(
433 description=description, service_type=service_type,
434 name=name)
435 flavor = body['flavor']
436 cls.flavors.append(flavor)
437 return flavor
438
439 @classmethod
440 def create_service_profile(cls, description, metainfo, driver):
441 """Wrapper utility that returns a test service profile."""
442 body = cls.admin_client.create_service_profile(
443 driver=driver, metainfo=metainfo, description=description)
444 service_profile = body['service_profile']
445 cls.service_profiles.append(service_profile)
446 return service_profile
447
448 @classmethod
449 def get_unused_ip(cls, net_id, ip_version=None):
Gary Kotton011345f2016-06-15 08:04:31 -0700450 """Get an unused ip address in a allocation pool of net"""
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000451 body = cls.admin_client.list_ports(network_id=net_id)
452 ports = body['ports']
453 used_ips = []
454 for port in ports:
455 used_ips.extend(
456 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
457 body = cls.admin_client.list_subnets(network_id=net_id)
458 subnets = body['subnets']
459
460 for subnet in subnets:
461 if ip_version and subnet['ip_version'] != ip_version:
462 continue
463 cidr = subnet['cidr']
464 allocation_pools = subnet['allocation_pools']
465 iterators = []
466 if allocation_pools:
467 for allocation_pool in allocation_pools:
468 iterators.append(netaddr.iter_iprange(
469 allocation_pool['start'], allocation_pool['end']))
470 else:
471 net = netaddr.IPNetwork(cidr)
472
473 def _iterip():
474 for ip in net:
475 if ip not in (net.network, net.broadcast):
476 yield ip
477 iterators.append(iter(_iterip()))
478
479 for iterator in iterators:
480 for ip in iterator:
481 if str(ip) not in used_ips:
482 return str(ip)
483
484 message = (
485 "net(%s) has no usable IP address in allocation pools" % net_id)
486 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200487
488
489def _require_sorting(f):
490 @functools.wraps(f)
491 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200492 if not test.is_extension_enabled("sorting", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200493 self.skipTest('Sorting feature is required')
494 return f(self, *args, **kwargs)
495 return inner
496
497
498def _require_pagination(f):
499 @functools.wraps(f)
500 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200501 if not test.is_extension_enabled("pagination", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200502 self.skipTest('Pagination feature is required')
503 return f(self, *args, **kwargs)
504 return inner
505
506
507class BaseSearchCriteriaTest(BaseNetworkTest):
508
509 # This should be defined by subclasses to reflect resource name to test
510 resource = None
511
Armando Migliaccio57581c62016-07-01 10:13:19 -0700512 field = 'name'
513
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200514 # NOTE(ihrachys): some names, like those starting with an underscore (_)
515 # are sorted differently depending on whether the plugin implements native
516 # sorting support, or not. So we avoid any such cases here, sticking to
517 # alphanumeric. Also test a case when there are multiple resources with the
518 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200519 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
520
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200521 force_tenant_isolation = True
522
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200523 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200524
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200525 list_as_admin = False
526
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200527 def assertSameOrder(self, original, actual):
528 # gracefully handle iterators passed
529 original = list(original)
530 actual = list(actual)
531 self.assertEqual(len(original), len(actual))
532 for expected, res in zip(original, actual):
Armando Migliaccio57581c62016-07-01 10:13:19 -0700533 self.assertEqual(expected[self.field], res[self.field])
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200534
535 @utils.classproperty
536 def plural_name(self):
537 return '%ss' % self.resource
538
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200539 @property
540 def list_client(self):
541 return self.admin_client if self.list_as_admin else self.client
542
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200543 def list_method(self, *args, **kwargs):
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200544 method = getattr(self.list_client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200545 kwargs.update(self.list_kwargs)
546 return method(*args, **kwargs)
547
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200548 def get_bare_url(self, url):
549 base_url = self.client.base_url
550 self.assertTrue(url.startswith(base_url))
551 return url[len(base_url):]
552
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200553 @classmethod
554 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200555 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200556
557 def _test_list_sorts(self, direction):
558 sort_args = {
559 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700560 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200561 }
562 body = self.list_method(**sort_args)
563 resources = self._extract_resources(body)
564 self.assertNotEmpty(
565 resources, "%s list returned is empty" % self.resource)
Armando Migliaccio57581c62016-07-01 10:13:19 -0700566 retrieved_names = [res[self.field] for res in resources]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200567 expected = sorted(retrieved_names)
568 if direction == constants.SORT_DIRECTION_DESC:
569 expected = list(reversed(expected))
570 self.assertEqual(expected, retrieved_names)
571
572 @_require_sorting
573 def _test_list_sorts_asc(self):
574 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
575
576 @_require_sorting
577 def _test_list_sorts_desc(self):
578 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
579
580 @_require_pagination
581 def _test_list_pagination(self):
582 for limit in range(1, len(self.resource_names) + 1):
583 pagination_args = {
584 'limit': limit,
585 }
586 body = self.list_method(**pagination_args)
587 resources = self._extract_resources(body)
588 self.assertEqual(limit, len(resources))
589
590 @_require_pagination
591 def _test_list_no_pagination_limit_0(self):
592 pagination_args = {
593 'limit': 0,
594 }
595 body = self.list_method(**pagination_args)
596 resources = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200597 self.assertGreaterEqual(len(resources), len(self.resource_names))
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200598
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200599 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200600 # first, collect all resources for later comparison
601 sort_args = {
602 'sort_dir': constants.SORT_DIRECTION_ASC,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700603 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200604 }
605 body = self.list_method(**sort_args)
606 expected_resources = self._extract_resources(body)
607 self.assertNotEmpty(expected_resources)
608
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200609 resources = lister(
610 len(expected_resources), sort_args
611 )
612
613 # finally, compare that the list retrieved in one go is identical to
614 # the one containing pagination results
615 self.assertSameOrder(expected_resources, resources)
616
617 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200618 # paginate resources one by one, using last fetched resource as a
619 # marker
620 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200621 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200622 pagination_args = sort_args.copy()
623 pagination_args['limit'] = 1
624 if resources:
625 pagination_args['marker'] = resources[-1]['id']
626 body = self.list_method(**pagination_args)
627 resources_ = self._extract_resources(body)
628 self.assertEqual(1, len(resources_))
629 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200630 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200631
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200632 @_require_pagination
633 @_require_sorting
634 def _test_list_pagination_with_marker(self):
635 self._test_list_pagination_iteratively(self._list_all_with_marker)
636
637 def _list_all_with_hrefs(self, niterations, sort_args):
638 # paginate resources one by one, using next href links
639 resources = []
640 prev_links = {}
641
642 for i in range(niterations):
643 if prev_links:
644 uri = self.get_bare_url(prev_links['next'])
645 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200646 sort_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200647 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200648 self.plural_name, limit=1, **sort_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200649 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200650 self.plural_name, uri
651 )
652 resources_ = self._extract_resources(body)
653 self.assertEqual(1, len(resources_))
654 resources.extend(resources_)
655
656 # The last element is empty and does not contain 'next' link
657 uri = self.get_bare_url(prev_links['next'])
658 prev_links, body = self.client.get_uri_with_links(
659 self.plural_name, uri
660 )
661 self.assertNotIn('next', prev_links)
662
663 # Now walk backwards and compare results
664 resources2 = []
665 for i in range(niterations):
666 uri = self.get_bare_url(prev_links['previous'])
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200667 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200668 self.plural_name, uri
669 )
670 resources_ = self._extract_resources(body)
671 self.assertEqual(1, len(resources_))
672 resources2.extend(resources_)
673
674 self.assertSameOrder(resources, reversed(resources2))
675
676 return resources
677
678 @_require_pagination
679 @_require_sorting
680 def _test_list_pagination_with_href_links(self):
681 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
682
683 @_require_pagination
684 @_require_sorting
685 def _test_list_pagination_page_reverse_with_href_links(
686 self, direction=constants.SORT_DIRECTION_ASC):
687 pagination_args = {
688 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700689 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200690 }
691 body = self.list_method(**pagination_args)
692 expected_resources = self._extract_resources(body)
693
694 page_size = 2
695 pagination_args['limit'] = page_size
696
697 prev_links = {}
698 resources = []
699 num_resources = len(expected_resources)
700 niterations = int(math.ceil(float(num_resources) / page_size))
701 for i in range(niterations):
702 if prev_links:
703 uri = self.get_bare_url(prev_links['previous'])
704 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200705 pagination_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200706 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200707 self.plural_name, page_reverse=True, **pagination_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200708 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200709 self.plural_name, uri
710 )
711 resources_ = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200712 self.assertGreaterEqual(page_size, len(resources_))
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200713 resources.extend(reversed(resources_))
714
715 self.assertSameOrder(expected_resources, reversed(resources))
716
717 @_require_pagination
718 @_require_sorting
719 def _test_list_pagination_page_reverse_asc(self):
720 self._test_list_pagination_page_reverse(
721 direction=constants.SORT_DIRECTION_ASC)
722
723 @_require_pagination
724 @_require_sorting
725 def _test_list_pagination_page_reverse_desc(self):
726 self._test_list_pagination_page_reverse(
727 direction=constants.SORT_DIRECTION_DESC)
728
729 def _test_list_pagination_page_reverse(self, direction):
730 pagination_args = {
731 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700732 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200733 'limit': 3,
734 }
735 body = self.list_method(**pagination_args)
736 expected_resources = self._extract_resources(body)
737
738 pagination_args['limit'] -= 1
739 pagination_args['marker'] = expected_resources[-1]['id']
740 pagination_args['page_reverse'] = True
741 body = self.list_method(**pagination_args)
742
743 self.assertSameOrder(
744 # the last entry is not included in 2nd result when used as a
745 # marker
746 expected_resources[:-1],
747 self._extract_resources(body))
Victor Morales1be97b42016-09-05 08:50:06 -0500748
749 def _test_list_validation_filters(self):
750 validation_args = {
751 'unknown_filter': 'value',
752 }
753 body = self.list_method(**validation_args)
754 resources = self._extract_resources(body)
755 for resource in resources:
756 self.assertIn(resource['name'], self.resource_names)