blob: 0d77064e5b7bac8506e6435cb943cedf5284efea [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
Genadi Chereshnyacc395c02016-07-25 12:17:37 +030064 manager = super(BaseNetworkTest, cls).get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new
68 )
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000069 # Neutron uses a different clients manager than the one in the Tempest
70 return clients.Manager(manager.credentials)
71
72 @classmethod
73 def skip_checks(cls):
74 super(BaseNetworkTest, cls).skip_checks()
75 if not CONF.service_available.neutron:
76 raise cls.skipException("Neutron support is required")
77 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
78 raise cls.skipException("IPv6 Tests are disabled.")
79
80 @classmethod
81 def setup_credentials(cls):
82 # Create no network resources for these test.
83 cls.set_network_resources()
84 super(BaseNetworkTest, cls).setup_credentials()
85
86 @classmethod
87 def setup_clients(cls):
88 super(BaseNetworkTest, cls).setup_clients()
89 cls.client = cls.os.network_client
90
91 @classmethod
92 def resource_setup(cls):
93 super(BaseNetworkTest, cls).resource_setup()
94
95 cls.networks = []
Miguel Lavalle124378b2016-09-21 16:41:47 -050096 cls.admin_networks = []
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000097 cls.subnets = []
98 cls.ports = []
99 cls.routers = []
100 cls.floating_ips = []
101 cls.metering_labels = []
102 cls.service_profiles = []
103 cls.flavors = []
104 cls.metering_label_rules = []
105 cls.qos_rules = []
106 cls.qos_policies = []
107 cls.ethertype = "IPv" + str(cls._ip_version)
108 cls.address_scopes = []
109 cls.admin_address_scopes = []
110 cls.subnetpools = []
111 cls.admin_subnetpools = []
Itzik Brownbac51dc2016-10-31 12:25:04 +0000112 cls.security_groups = []
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000113
114 @classmethod
115 def resource_cleanup(cls):
116 if CONF.service_available.neutron:
117 # Clean up QoS rules
118 for qos_rule in cls.qos_rules:
119 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
120 qos_rule['id'])
121 # Clean up QoS policies
122 for qos_policy in cls.qos_policies:
123 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
124 qos_policy['id'])
125 # Clean up floating IPs
126 for floating_ip in cls.floating_ips:
127 cls._try_delete_resource(cls.client.delete_floatingip,
128 floating_ip['id'])
129 # Clean up routers
130 for router in cls.routers:
131 cls._try_delete_resource(cls.delete_router,
132 router)
133 # Clean up metering label rules
134 for metering_label_rule in cls.metering_label_rules:
135 cls._try_delete_resource(
136 cls.admin_client.delete_metering_label_rule,
137 metering_label_rule['id'])
138 # Clean up metering labels
139 for metering_label in cls.metering_labels:
140 cls._try_delete_resource(
141 cls.admin_client.delete_metering_label,
142 metering_label['id'])
143 # Clean up flavors
144 for flavor in cls.flavors:
145 cls._try_delete_resource(
146 cls.admin_client.delete_flavor,
147 flavor['id'])
148 # Clean up service profiles
149 for service_profile in cls.service_profiles:
150 cls._try_delete_resource(
151 cls.admin_client.delete_service_profile,
152 service_profile['id'])
153 # Clean up ports
154 for port in cls.ports:
155 cls._try_delete_resource(cls.client.delete_port,
156 port['id'])
157 # Clean up subnets
158 for subnet in cls.subnets:
159 cls._try_delete_resource(cls.client.delete_subnet,
160 subnet['id'])
161 # Clean up networks
162 for network in cls.networks:
163 cls._try_delete_resource(cls.client.delete_network,
164 network['id'])
165
Miguel Lavalle124378b2016-09-21 16:41:47 -0500166 # Clean up admin networks
167 for network in cls.admin_networks:
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000168 cls._try_delete_resource(cls.admin_client.delete_network,
169 network['id'])
170
Itzik Brownbac51dc2016-10-31 12:25:04 +0000171 # Clean up security groups
172 for secgroup in cls.security_groups:
173 cls._try_delete_resource(cls.client.delete_security_group,
174 secgroup['id'])
175
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000176 for subnetpool in cls.subnetpools:
177 cls._try_delete_resource(cls.client.delete_subnetpool,
178 subnetpool['id'])
179
180 for subnetpool in cls.admin_subnetpools:
181 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
182 subnetpool['id'])
183
184 for address_scope in cls.address_scopes:
185 cls._try_delete_resource(cls.client.delete_address_scope,
186 address_scope['id'])
187
188 for address_scope in cls.admin_address_scopes:
189 cls._try_delete_resource(
190 cls.admin_client.delete_address_scope,
191 address_scope['id'])
192
193 super(BaseNetworkTest, cls).resource_cleanup()
194
195 @classmethod
196 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
197 """Cleanup resources in case of test-failure
198
199 Some resources are explicitly deleted by the test.
200 If the test failed to delete a resource, this method will execute
201 the appropriate delete methods. Otherwise, the method ignores NotFound
202 exceptions thrown for resources that were correctly deleted by the
203 test.
204
205 :param delete_callable: delete method
206 :param args: arguments for delete method
207 :param kwargs: keyword arguments for delete method
208 """
209 try:
210 delete_callable(*args, **kwargs)
211 # if resource is not found, this means it was deleted in the test
212 except lib_exc.NotFound:
213 pass
214
215 @classmethod
216 def create_network(cls, network_name=None, **kwargs):
217 """Wrapper utility that returns a test network."""
218 network_name = network_name or data_utils.rand_name('test-network-')
219
220 body = cls.client.create_network(name=network_name, **kwargs)
221 network = body['network']
222 cls.networks.append(network)
223 return network
224
225 @classmethod
226 def create_shared_network(cls, network_name=None, **post_body):
227 network_name = network_name or data_utils.rand_name('sharednetwork-')
228 post_body.update({'name': network_name, 'shared': True})
229 body = cls.admin_client.create_network(**post_body)
230 network = body['network']
Miguel Lavalle124378b2016-09-21 16:41:47 -0500231 cls.admin_networks.append(network)
232 return network
233
234 @classmethod
235 def create_network_keystone_v3(cls, network_name=None, project_id=None,
236 tenant_id=None, client=None):
237 """Wrapper utility that creates a test network with project_id."""
238 client = client or cls.client
239 network_name = network_name or data_utils.rand_name(
240 'test-network-with-project_id')
241 project_id = cls.client.tenant_id
242 body = client.create_network_keystone_v3(network_name, project_id,
243 tenant_id)
244 network = body['network']
245 if client is cls.client:
246 cls.networks.append(network)
247 else:
248 cls.admin_networks.append(network)
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000249 return network
250
251 @classmethod
252 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
253 ip_version=None, client=None, **kwargs):
254 """Wrapper utility that returns a test subnet."""
255
256 # allow tests to use admin client
257 if not client:
258 client = cls.client
259
260 # The cidr and mask_bits depend on the ip version.
261 ip_version = ip_version if ip_version is not None else cls._ip_version
262 gateway_not_set = gateway == ''
263 if ip_version == 4:
264 cidr = cidr or netaddr.IPNetwork(
265 config.safe_get_config_value(
266 'network', 'project_network_cidr'))
267 mask_bits = (
268 mask_bits or config.safe_get_config_value(
269 'network', 'project_network_mask_bits'))
270 elif ip_version == 6:
271 cidr = (
272 cidr or netaddr.IPNetwork(
273 config.safe_get_config_value(
274 'network', 'project_network_v6_cidr')))
275 mask_bits = (
276 mask_bits or config.safe_get_config_value(
277 'network', 'project_network_v6_mask_bits'))
278 # Find a cidr that is not in use yet and create a subnet with it
279 for subnet_cidr in cidr.subnet(mask_bits):
280 if gateway_not_set:
281 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
282 else:
283 gateway_ip = gateway
284 try:
285 body = client.create_subnet(
286 network_id=network['id'],
287 cidr=str(subnet_cidr),
288 ip_version=ip_version,
289 gateway_ip=gateway_ip,
290 **kwargs)
291 break
292 except lib_exc.BadRequest as e:
293 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
294 if not is_overlapping_cidr:
295 raise
296 else:
297 message = 'Available CIDR for subnet creation could not be found'
298 raise ValueError(message)
299 subnet = body['subnet']
300 cls.subnets.append(subnet)
301 return subnet
302
303 @classmethod
304 def create_port(cls, network, **kwargs):
305 """Wrapper utility that returns a test port."""
306 body = cls.client.create_port(network_id=network['id'],
307 **kwargs)
308 port = body['port']
309 cls.ports.append(port)
310 return port
311
312 @classmethod
313 def update_port(cls, port, **kwargs):
314 """Wrapper utility that updates a test port."""
315 body = cls.client.update_port(port['id'],
316 **kwargs)
317 return body['port']
318
319 @classmethod
320 def create_router(cls, router_name=None, admin_state_up=False,
321 external_network_id=None, enable_snat=None,
322 **kwargs):
323 ext_gw_info = {}
324 if external_network_id:
325 ext_gw_info['network_id'] = external_network_id
326 if enable_snat:
327 ext_gw_info['enable_snat'] = enable_snat
328 body = cls.client.create_router(
329 router_name, external_gateway_info=ext_gw_info,
330 admin_state_up=admin_state_up, **kwargs)
331 router = body['router']
332 cls.routers.append(router)
333 return router
334
335 @classmethod
336 def create_floatingip(cls, external_network_id):
337 """Wrapper utility that returns a test floating IP."""
338 body = cls.client.create_floatingip(
339 floating_network_id=external_network_id)
340 fip = body['floatingip']
341 cls.floating_ips.append(fip)
342 return fip
343
344 @classmethod
345 def create_router_interface(cls, router_id, subnet_id):
346 """Wrapper utility that returns a router interface."""
347 interface = cls.client.add_router_interface_with_subnet_id(
348 router_id, subnet_id)
349 return interface
350
351 @classmethod
Sławek Kapłońskiff294062016-12-04 15:00:54 +0000352 def get_supported_qos_rule_types(cls):
353 body = cls.client.list_qos_rule_types()
354 return [rule_type['type'] for rule_type in body['rule_types']]
355
356 @classmethod
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200357 def create_qos_policy(cls, name, description=None, shared=False,
358 tenant_id=None):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000359 """Wrapper utility that returns a test QoS policy."""
360 body = cls.admin_client.create_qos_policy(
361 name, description, shared, tenant_id)
362 qos_policy = body['policy']
363 cls.qos_policies.append(qos_policy)
364 return qos_policy
365
366 @classmethod
367 def create_qos_bandwidth_limit_rule(cls, policy_id,
368 max_kbps, max_burst_kbps):
369 """Wrapper utility that returns a test QoS bandwidth limit rule."""
370 body = cls.admin_client.create_bandwidth_limit_rule(
371 policy_id, max_kbps, max_burst_kbps)
372 qos_rule = body['bandwidth_limit_rule']
373 cls.qos_rules.append(qos_rule)
374 return qos_rule
375
376 @classmethod
377 def delete_router(cls, router):
378 body = cls.client.list_router_interfaces(router['id'])
379 interfaces = body['ports']
380 for i in interfaces:
381 try:
382 cls.client.remove_router_interface_with_subnet_id(
383 router['id'], i['fixed_ips'][0]['subnet_id'])
384 except lib_exc.NotFound:
385 pass
386 cls.client.delete_router(router['id'])
387
388 @classmethod
389 def create_address_scope(cls, name, is_admin=False, **kwargs):
390 if is_admin:
391 body = cls.admin_client.create_address_scope(name=name, **kwargs)
392 cls.admin_address_scopes.append(body['address_scope'])
393 else:
394 body = cls.client.create_address_scope(name=name, **kwargs)
395 cls.address_scopes.append(body['address_scope'])
396 return body['address_scope']
397
398 @classmethod
399 def create_subnetpool(cls, name, is_admin=False, **kwargs):
400 if is_admin:
401 body = cls.admin_client.create_subnetpool(name, **kwargs)
402 cls.admin_subnetpools.append(body['subnetpool'])
403 else:
404 body = cls.client.create_subnetpool(name, **kwargs)
405 cls.subnetpools.append(body['subnetpool'])
406 return body['subnetpool']
407
408
409class BaseAdminNetworkTest(BaseNetworkTest):
410
411 credentials = ['primary', 'admin']
412
413 @classmethod
414 def setup_clients(cls):
415 super(BaseAdminNetworkTest, cls).setup_clients()
416 cls.admin_client = cls.os_adm.network_client
417 cls.identity_admin_client = cls.os_adm.tenants_client
418
419 @classmethod
420 def create_metering_label(cls, name, description):
421 """Wrapper utility that returns a test metering label."""
422 body = cls.admin_client.create_metering_label(
423 description=description,
424 name=data_utils.rand_name("metering-label"))
425 metering_label = body['metering_label']
426 cls.metering_labels.append(metering_label)
427 return metering_label
428
429 @classmethod
430 def create_metering_label_rule(cls, remote_ip_prefix, direction,
431 metering_label_id):
432 """Wrapper utility that returns a test metering label rule."""
433 body = cls.admin_client.create_metering_label_rule(
434 remote_ip_prefix=remote_ip_prefix, direction=direction,
435 metering_label_id=metering_label_id)
436 metering_label_rule = body['metering_label_rule']
437 cls.metering_label_rules.append(metering_label_rule)
438 return metering_label_rule
439
440 @classmethod
441 def create_flavor(cls, name, description, service_type):
442 """Wrapper utility that returns a test flavor."""
443 body = cls.admin_client.create_flavor(
444 description=description, service_type=service_type,
445 name=name)
446 flavor = body['flavor']
447 cls.flavors.append(flavor)
448 return flavor
449
450 @classmethod
451 def create_service_profile(cls, description, metainfo, driver):
452 """Wrapper utility that returns a test service profile."""
453 body = cls.admin_client.create_service_profile(
454 driver=driver, metainfo=metainfo, description=description)
455 service_profile = body['service_profile']
456 cls.service_profiles.append(service_profile)
457 return service_profile
458
459 @classmethod
460 def get_unused_ip(cls, net_id, ip_version=None):
Gary Kotton011345f2016-06-15 08:04:31 -0700461 """Get an unused ip address in a allocation pool of net"""
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000462 body = cls.admin_client.list_ports(network_id=net_id)
463 ports = body['ports']
464 used_ips = []
465 for port in ports:
466 used_ips.extend(
467 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
468 body = cls.admin_client.list_subnets(network_id=net_id)
469 subnets = body['subnets']
470
471 for subnet in subnets:
472 if ip_version and subnet['ip_version'] != ip_version:
473 continue
474 cidr = subnet['cidr']
475 allocation_pools = subnet['allocation_pools']
476 iterators = []
477 if allocation_pools:
478 for allocation_pool in allocation_pools:
479 iterators.append(netaddr.iter_iprange(
480 allocation_pool['start'], allocation_pool['end']))
481 else:
482 net = netaddr.IPNetwork(cidr)
483
484 def _iterip():
485 for ip in net:
486 if ip not in (net.network, net.broadcast):
487 yield ip
488 iterators.append(iter(_iterip()))
489
490 for iterator in iterators:
491 for ip in iterator:
492 if str(ip) not in used_ips:
493 return str(ip)
494
495 message = (
496 "net(%s) has no usable IP address in allocation pools" % net_id)
497 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200498
499
Sławek Kapłońskiff294062016-12-04 15:00:54 +0000500def require_qos_rule_type(rule_type):
501 def decorator(f):
502 @functools.wraps(f)
503 def wrapper(self, *func_args, **func_kwargs):
504 if rule_type not in self.get_supported_qos_rule_types():
505 raise self.skipException(
506 "%s rule type is required." % rule_type)
507 return f(self, *func_args, **func_kwargs)
508 return wrapper
509 return decorator
510
511
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200512def _require_sorting(f):
513 @functools.wraps(f)
514 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200515 if not test.is_extension_enabled("sorting", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200516 self.skipTest('Sorting feature is required')
517 return f(self, *args, **kwargs)
518 return inner
519
520
521def _require_pagination(f):
522 @functools.wraps(f)
523 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200524 if not test.is_extension_enabled("pagination", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200525 self.skipTest('Pagination feature is required')
526 return f(self, *args, **kwargs)
527 return inner
528
529
530class BaseSearchCriteriaTest(BaseNetworkTest):
531
532 # This should be defined by subclasses to reflect resource name to test
533 resource = None
534
Armando Migliaccio57581c62016-07-01 10:13:19 -0700535 field = 'name'
536
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200537 # NOTE(ihrachys): some names, like those starting with an underscore (_)
538 # are sorted differently depending on whether the plugin implements native
539 # sorting support, or not. So we avoid any such cases here, sticking to
540 # alphanumeric. Also test a case when there are multiple resources with the
541 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200542 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
543
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200544 force_tenant_isolation = True
545
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200546 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200547
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200548 list_as_admin = False
549
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200550 def assertSameOrder(self, original, actual):
551 # gracefully handle iterators passed
552 original = list(original)
553 actual = list(actual)
554 self.assertEqual(len(original), len(actual))
555 for expected, res in zip(original, actual):
Armando Migliaccio57581c62016-07-01 10:13:19 -0700556 self.assertEqual(expected[self.field], res[self.field])
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200557
558 @utils.classproperty
559 def plural_name(self):
560 return '%ss' % self.resource
561
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200562 @property
563 def list_client(self):
564 return self.admin_client if self.list_as_admin else self.client
565
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200566 def list_method(self, *args, **kwargs):
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200567 method = getattr(self.list_client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200568 kwargs.update(self.list_kwargs)
569 return method(*args, **kwargs)
570
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200571 def get_bare_url(self, url):
572 base_url = self.client.base_url
573 self.assertTrue(url.startswith(base_url))
574 return url[len(base_url):]
575
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200576 @classmethod
577 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200578 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200579
580 def _test_list_sorts(self, direction):
581 sort_args = {
582 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700583 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200584 }
585 body = self.list_method(**sort_args)
586 resources = self._extract_resources(body)
587 self.assertNotEmpty(
588 resources, "%s list returned is empty" % self.resource)
Armando Migliaccio57581c62016-07-01 10:13:19 -0700589 retrieved_names = [res[self.field] for res in resources]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200590 expected = sorted(retrieved_names)
591 if direction == constants.SORT_DIRECTION_DESC:
592 expected = list(reversed(expected))
593 self.assertEqual(expected, retrieved_names)
594
595 @_require_sorting
596 def _test_list_sorts_asc(self):
597 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
598
599 @_require_sorting
600 def _test_list_sorts_desc(self):
601 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
602
603 @_require_pagination
604 def _test_list_pagination(self):
605 for limit in range(1, len(self.resource_names) + 1):
606 pagination_args = {
607 'limit': limit,
608 }
609 body = self.list_method(**pagination_args)
610 resources = self._extract_resources(body)
611 self.assertEqual(limit, len(resources))
612
613 @_require_pagination
614 def _test_list_no_pagination_limit_0(self):
615 pagination_args = {
616 'limit': 0,
617 }
618 body = self.list_method(**pagination_args)
619 resources = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200620 self.assertGreaterEqual(len(resources), len(self.resource_names))
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200621
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200622 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200623 # first, collect all resources for later comparison
624 sort_args = {
625 'sort_dir': constants.SORT_DIRECTION_ASC,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700626 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200627 }
628 body = self.list_method(**sort_args)
629 expected_resources = self._extract_resources(body)
630 self.assertNotEmpty(expected_resources)
631
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200632 resources = lister(
633 len(expected_resources), sort_args
634 )
635
636 # finally, compare that the list retrieved in one go is identical to
637 # the one containing pagination results
638 self.assertSameOrder(expected_resources, resources)
639
640 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200641 # paginate resources one by one, using last fetched resource as a
642 # marker
643 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200644 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200645 pagination_args = sort_args.copy()
646 pagination_args['limit'] = 1
647 if resources:
648 pagination_args['marker'] = resources[-1]['id']
649 body = self.list_method(**pagination_args)
650 resources_ = self._extract_resources(body)
651 self.assertEqual(1, len(resources_))
652 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200653 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200654
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200655 @_require_pagination
656 @_require_sorting
657 def _test_list_pagination_with_marker(self):
658 self._test_list_pagination_iteratively(self._list_all_with_marker)
659
660 def _list_all_with_hrefs(self, niterations, sort_args):
661 # paginate resources one by one, using next href links
662 resources = []
663 prev_links = {}
664
665 for i in range(niterations):
666 if prev_links:
667 uri = self.get_bare_url(prev_links['next'])
668 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200669 sort_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200670 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200671 self.plural_name, limit=1, **sort_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200672 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200673 self.plural_name, uri
674 )
675 resources_ = self._extract_resources(body)
676 self.assertEqual(1, len(resources_))
677 resources.extend(resources_)
678
679 # The last element is empty and does not contain 'next' link
680 uri = self.get_bare_url(prev_links['next'])
681 prev_links, body = self.client.get_uri_with_links(
682 self.plural_name, uri
683 )
684 self.assertNotIn('next', prev_links)
685
686 # Now walk backwards and compare results
687 resources2 = []
688 for i in range(niterations):
689 uri = self.get_bare_url(prev_links['previous'])
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200690 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200691 self.plural_name, uri
692 )
693 resources_ = self._extract_resources(body)
694 self.assertEqual(1, len(resources_))
695 resources2.extend(resources_)
696
697 self.assertSameOrder(resources, reversed(resources2))
698
699 return resources
700
701 @_require_pagination
702 @_require_sorting
703 def _test_list_pagination_with_href_links(self):
704 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
705
706 @_require_pagination
707 @_require_sorting
708 def _test_list_pagination_page_reverse_with_href_links(
709 self, direction=constants.SORT_DIRECTION_ASC):
710 pagination_args = {
711 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700712 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200713 }
714 body = self.list_method(**pagination_args)
715 expected_resources = self._extract_resources(body)
716
717 page_size = 2
718 pagination_args['limit'] = page_size
719
720 prev_links = {}
721 resources = []
722 num_resources = len(expected_resources)
723 niterations = int(math.ceil(float(num_resources) / page_size))
724 for i in range(niterations):
725 if prev_links:
726 uri = self.get_bare_url(prev_links['previous'])
727 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200728 pagination_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200729 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200730 self.plural_name, page_reverse=True, **pagination_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200731 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200732 self.plural_name, uri
733 )
734 resources_ = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200735 self.assertGreaterEqual(page_size, len(resources_))
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200736 resources.extend(reversed(resources_))
737
738 self.assertSameOrder(expected_resources, reversed(resources))
739
740 @_require_pagination
741 @_require_sorting
742 def _test_list_pagination_page_reverse_asc(self):
743 self._test_list_pagination_page_reverse(
744 direction=constants.SORT_DIRECTION_ASC)
745
746 @_require_pagination
747 @_require_sorting
748 def _test_list_pagination_page_reverse_desc(self):
749 self._test_list_pagination_page_reverse(
750 direction=constants.SORT_DIRECTION_DESC)
751
752 def _test_list_pagination_page_reverse(self, direction):
753 pagination_args = {
754 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700755 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200756 'limit': 3,
757 }
758 body = self.list_method(**pagination_args)
759 expected_resources = self._extract_resources(body)
760
761 pagination_args['limit'] -= 1
762 pagination_args['marker'] = expected_resources[-1]['id']
763 pagination_args['page_reverse'] = True
764 body = self.list_method(**pagination_args)
765
766 self.assertSameOrder(
767 # the last entry is not included in 2nd result when used as a
768 # marker
769 expected_resources[:-1],
770 self._extract_resources(body))
Victor Morales1be97b42016-09-05 08:50:06 -0500771
772 def _test_list_validation_filters(self):
773 validation_args = {
774 'unknown_filter': 'value',
775 }
776 body = self.list_method(**validation_args)
777 resources = self._extract_resources(body)
778 for resource in resources:
779 self.assertIn(resource['name'], self.resource_names)