blob: 84746fbbaddf62a4076fd6e510d866ee9d578cb0 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020017import math
Ihar Hrachyshka59382252016-04-05 15:54:33 +020018
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000019import netaddr
20from tempest.lib.common.utils import data_utils
21from tempest.lib import exceptions as lib_exc
22from tempest import test
23
Ihar Hrachyshka59382252016-04-05 15:54:33 +020024from neutron.common import constants
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +020025from neutron.common import utils
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000026from neutron.tests.tempest.api import clients
27from neutron.tests.tempest import config
28from neutron.tests.tempest import exceptions
29
30CONF = config.CONF
31
32
33class BaseNetworkTest(test.BaseTestCase):
34
35 """
36 Base class for the Neutron tests that use the Tempest Neutron REST client
37
38 Per the Neutron API Guide, API v1.x was removed from the source code tree
39 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
40 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
41 following options are defined in the [network] section of etc/tempest.conf:
42
43 project_network_cidr with a block of cidr's from which smaller blocks
44 can be allocated for tenant networks
45
46 project_network_mask_bits with the mask bits to be used to partition
47 the block defined by tenant-network_cidr
48
49 Finally, it is assumed that the following option is defined in the
50 [service_available] section of etc/tempest.conf
51
52 neutron as True
53 """
54
55 force_tenant_isolation = False
56 credentials = ['primary']
57
58 # Default to ipv4.
59 _ip_version = 4
60
61 @classmethod
62 def get_client_manager(cls, credential_type=None, roles=None,
63 force_new=None):
Genadi Chereshnyacc395c02016-07-25 12:17:37 +030064 manager = super(BaseNetworkTest, cls).get_client_manager(
65 credential_type=credential_type,
66 roles=roles,
67 force_new=force_new
68 )
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000069 # Neutron uses a different clients manager than the one in the Tempest
70 return clients.Manager(manager.credentials)
71
72 @classmethod
73 def skip_checks(cls):
74 super(BaseNetworkTest, cls).skip_checks()
75 if not CONF.service_available.neutron:
76 raise cls.skipException("Neutron support is required")
77 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
78 raise cls.skipException("IPv6 Tests are disabled.")
Jakub Libosvar1982aa12017-05-30 11:15:33 +000079 for req_ext in getattr(cls, 'required_extensions', []):
80 if not test.is_extension_enabled(req_ext, 'network'):
81 msg = "%s extension not enabled." % req_ext
82 raise cls.skipException(msg)
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000083
84 @classmethod
85 def setup_credentials(cls):
86 # Create no network resources for these test.
87 cls.set_network_resources()
88 super(BaseNetworkTest, cls).setup_credentials()
89
90 @classmethod
91 def setup_clients(cls):
92 super(BaseNetworkTest, cls).setup_clients()
93 cls.client = cls.os.network_client
94
95 @classmethod
96 def resource_setup(cls):
97 super(BaseNetworkTest, cls).resource_setup()
98
99 cls.networks = []
Miguel Lavalle124378b2016-09-21 16:41:47 -0500100 cls.admin_networks = []
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000101 cls.subnets = []
102 cls.ports = []
103 cls.routers = []
104 cls.floating_ips = []
105 cls.metering_labels = []
106 cls.service_profiles = []
107 cls.flavors = []
108 cls.metering_label_rules = []
109 cls.qos_rules = []
110 cls.qos_policies = []
111 cls.ethertype = "IPv" + str(cls._ip_version)
112 cls.address_scopes = []
113 cls.admin_address_scopes = []
114 cls.subnetpools = []
115 cls.admin_subnetpools = []
Itzik Brownbac51dc2016-10-31 12:25:04 +0000116 cls.security_groups = []
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000117
118 @classmethod
119 def resource_cleanup(cls):
120 if CONF.service_available.neutron:
121 # Clean up QoS rules
122 for qos_rule in cls.qos_rules:
123 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
124 qos_rule['id'])
125 # Clean up QoS policies
126 for qos_policy in cls.qos_policies:
127 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
128 qos_policy['id'])
129 # Clean up floating IPs
130 for floating_ip in cls.floating_ips:
131 cls._try_delete_resource(cls.client.delete_floatingip,
132 floating_ip['id'])
133 # Clean up routers
134 for router in cls.routers:
135 cls._try_delete_resource(cls.delete_router,
136 router)
137 # Clean up metering label rules
138 for metering_label_rule in cls.metering_label_rules:
139 cls._try_delete_resource(
140 cls.admin_client.delete_metering_label_rule,
141 metering_label_rule['id'])
142 # Clean up metering labels
143 for metering_label in cls.metering_labels:
144 cls._try_delete_resource(
145 cls.admin_client.delete_metering_label,
146 metering_label['id'])
147 # Clean up flavors
148 for flavor in cls.flavors:
149 cls._try_delete_resource(
150 cls.admin_client.delete_flavor,
151 flavor['id'])
152 # Clean up service profiles
153 for service_profile in cls.service_profiles:
154 cls._try_delete_resource(
155 cls.admin_client.delete_service_profile,
156 service_profile['id'])
157 # Clean up ports
158 for port in cls.ports:
159 cls._try_delete_resource(cls.client.delete_port,
160 port['id'])
161 # Clean up subnets
162 for subnet in cls.subnets:
163 cls._try_delete_resource(cls.client.delete_subnet,
164 subnet['id'])
165 # Clean up networks
166 for network in cls.networks:
167 cls._try_delete_resource(cls.client.delete_network,
168 network['id'])
169
Miguel Lavalle124378b2016-09-21 16:41:47 -0500170 # Clean up admin networks
171 for network in cls.admin_networks:
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000172 cls._try_delete_resource(cls.admin_client.delete_network,
173 network['id'])
174
Itzik Brownbac51dc2016-10-31 12:25:04 +0000175 # Clean up security groups
176 for secgroup in cls.security_groups:
177 cls._try_delete_resource(cls.client.delete_security_group,
178 secgroup['id'])
179
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000180 for subnetpool in cls.subnetpools:
181 cls._try_delete_resource(cls.client.delete_subnetpool,
182 subnetpool['id'])
183
184 for subnetpool in cls.admin_subnetpools:
185 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
186 subnetpool['id'])
187
188 for address_scope in cls.address_scopes:
189 cls._try_delete_resource(cls.client.delete_address_scope,
190 address_scope['id'])
191
192 for address_scope in cls.admin_address_scopes:
193 cls._try_delete_resource(
194 cls.admin_client.delete_address_scope,
195 address_scope['id'])
196
197 super(BaseNetworkTest, cls).resource_cleanup()
198
199 @classmethod
200 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
201 """Cleanup resources in case of test-failure
202
203 Some resources are explicitly deleted by the test.
204 If the test failed to delete a resource, this method will execute
205 the appropriate delete methods. Otherwise, the method ignores NotFound
206 exceptions thrown for resources that were correctly deleted by the
207 test.
208
209 :param delete_callable: delete method
210 :param args: arguments for delete method
211 :param kwargs: keyword arguments for delete method
212 """
213 try:
214 delete_callable(*args, **kwargs)
215 # if resource is not found, this means it was deleted in the test
216 except lib_exc.NotFound:
217 pass
218
219 @classmethod
220 def create_network(cls, network_name=None, **kwargs):
221 """Wrapper utility that returns a test network."""
222 network_name = network_name or data_utils.rand_name('test-network-')
223
224 body = cls.client.create_network(name=network_name, **kwargs)
225 network = body['network']
226 cls.networks.append(network)
227 return network
228
229 @classmethod
230 def create_shared_network(cls, network_name=None, **post_body):
231 network_name = network_name or data_utils.rand_name('sharednetwork-')
232 post_body.update({'name': network_name, 'shared': True})
233 body = cls.admin_client.create_network(**post_body)
234 network = body['network']
Miguel Lavalle124378b2016-09-21 16:41:47 -0500235 cls.admin_networks.append(network)
236 return network
237
238 @classmethod
239 def create_network_keystone_v3(cls, network_name=None, project_id=None,
240 tenant_id=None, client=None):
241 """Wrapper utility that creates a test network with project_id."""
242 client = client or cls.client
243 network_name = network_name or data_utils.rand_name(
244 'test-network-with-project_id')
245 project_id = cls.client.tenant_id
246 body = client.create_network_keystone_v3(network_name, project_id,
247 tenant_id)
248 network = body['network']
249 if client is cls.client:
250 cls.networks.append(network)
251 else:
252 cls.admin_networks.append(network)
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000253 return network
254
255 @classmethod
256 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
257 ip_version=None, client=None, **kwargs):
258 """Wrapper utility that returns a test subnet."""
259
260 # allow tests to use admin client
261 if not client:
262 client = cls.client
263
264 # The cidr and mask_bits depend on the ip version.
265 ip_version = ip_version if ip_version is not None else cls._ip_version
266 gateway_not_set = gateway == ''
267 if ip_version == 4:
268 cidr = cidr or netaddr.IPNetwork(
269 config.safe_get_config_value(
270 'network', 'project_network_cidr'))
271 mask_bits = (
272 mask_bits or config.safe_get_config_value(
273 'network', 'project_network_mask_bits'))
274 elif ip_version == 6:
275 cidr = (
276 cidr or netaddr.IPNetwork(
277 config.safe_get_config_value(
278 'network', 'project_network_v6_cidr')))
279 mask_bits = (
280 mask_bits or config.safe_get_config_value(
281 'network', 'project_network_v6_mask_bits'))
282 # Find a cidr that is not in use yet and create a subnet with it
283 for subnet_cidr in cidr.subnet(mask_bits):
284 if gateway_not_set:
285 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
286 else:
287 gateway_ip = gateway
288 try:
289 body = client.create_subnet(
290 network_id=network['id'],
291 cidr=str(subnet_cidr),
292 ip_version=ip_version,
293 gateway_ip=gateway_ip,
294 **kwargs)
295 break
296 except lib_exc.BadRequest as e:
297 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
298 if not is_overlapping_cidr:
299 raise
300 else:
301 message = 'Available CIDR for subnet creation could not be found'
302 raise ValueError(message)
303 subnet = body['subnet']
304 cls.subnets.append(subnet)
305 return subnet
306
307 @classmethod
308 def create_port(cls, network, **kwargs):
309 """Wrapper utility that returns a test port."""
310 body = cls.client.create_port(network_id=network['id'],
311 **kwargs)
312 port = body['port']
313 cls.ports.append(port)
314 return port
315
316 @classmethod
317 def update_port(cls, port, **kwargs):
318 """Wrapper utility that updates a test port."""
319 body = cls.client.update_port(port['id'],
320 **kwargs)
321 return body['port']
322
323 @classmethod
Genadi Chereshnyac0411e92016-07-11 16:59:42 +0300324 def _create_router_with_client(
325 cls, client, router_name=None, admin_state_up=False,
326 external_network_id=None, enable_snat=None, **kwargs
327 ):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000328 ext_gw_info = {}
329 if external_network_id:
330 ext_gw_info['network_id'] = external_network_id
YAMAMOTO Takashi9bd4f972017-06-20 12:49:30 +0900331 if enable_snat is not None:
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000332 ext_gw_info['enable_snat'] = enable_snat
Genadi Chereshnyac0411e92016-07-11 16:59:42 +0300333 body = client.create_router(
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000334 router_name, external_gateway_info=ext_gw_info,
335 admin_state_up=admin_state_up, **kwargs)
336 router = body['router']
337 cls.routers.append(router)
338 return router
339
340 @classmethod
Genadi Chereshnyac0411e92016-07-11 16:59:42 +0300341 def create_router(cls, *args, **kwargs):
342 return cls._create_router_with_client(cls.client, *args, **kwargs)
343
344 @classmethod
345 def create_admin_router(cls, *args, **kwargs):
rajat294495c042017-06-28 15:37:16 +0530346 return cls._create_router_with_client(cls.os_admin.network_client,
Genadi Chereshnyac0411e92016-07-11 16:59:42 +0300347 *args, **kwargs)
348
349 @classmethod
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000350 def create_floatingip(cls, external_network_id):
351 """Wrapper utility that returns a test floating IP."""
352 body = cls.client.create_floatingip(
353 floating_network_id=external_network_id)
354 fip = body['floatingip']
355 cls.floating_ips.append(fip)
356 return fip
357
358 @classmethod
359 def create_router_interface(cls, router_id, subnet_id):
360 """Wrapper utility that returns a router interface."""
361 interface = cls.client.add_router_interface_with_subnet_id(
362 router_id, subnet_id)
363 return interface
364
365 @classmethod
Sławek Kapłońskiff294062016-12-04 15:00:54 +0000366 def get_supported_qos_rule_types(cls):
367 body = cls.client.list_qos_rule_types()
368 return [rule_type['type'] for rule_type in body['rule_types']]
369
370 @classmethod
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200371 def create_qos_policy(cls, name, description=None, shared=False,
372 tenant_id=None):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000373 """Wrapper utility that returns a test QoS policy."""
374 body = cls.admin_client.create_qos_policy(
375 name, description, shared, tenant_id)
376 qos_policy = body['policy']
377 cls.qos_policies.append(qos_policy)
378 return qos_policy
379
380 @classmethod
Sławek Kapłoński153f3452017-03-24 22:04:53 +0000381 def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
382 max_burst_kbps,
383 direction=constants.EGRESS_DIRECTION):
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000384 """Wrapper utility that returns a test QoS bandwidth limit rule."""
385 body = cls.admin_client.create_bandwidth_limit_rule(
Sławek Kapłoński153f3452017-03-24 22:04:53 +0000386 policy_id, max_kbps, max_burst_kbps, direction)
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000387 qos_rule = body['bandwidth_limit_rule']
388 cls.qos_rules.append(qos_rule)
389 return qos_rule
390
391 @classmethod
392 def delete_router(cls, router):
393 body = cls.client.list_router_interfaces(router['id'])
394 interfaces = body['ports']
395 for i in interfaces:
396 try:
397 cls.client.remove_router_interface_with_subnet_id(
398 router['id'], i['fixed_ips'][0]['subnet_id'])
399 except lib_exc.NotFound:
400 pass
401 cls.client.delete_router(router['id'])
402
403 @classmethod
404 def create_address_scope(cls, name, is_admin=False, **kwargs):
405 if is_admin:
406 body = cls.admin_client.create_address_scope(name=name, **kwargs)
407 cls.admin_address_scopes.append(body['address_scope'])
408 else:
409 body = cls.client.create_address_scope(name=name, **kwargs)
410 cls.address_scopes.append(body['address_scope'])
411 return body['address_scope']
412
413 @classmethod
414 def create_subnetpool(cls, name, is_admin=False, **kwargs):
415 if is_admin:
416 body = cls.admin_client.create_subnetpool(name, **kwargs)
417 cls.admin_subnetpools.append(body['subnetpool'])
418 else:
419 body = cls.client.create_subnetpool(name, **kwargs)
420 cls.subnetpools.append(body['subnetpool'])
421 return body['subnetpool']
422
423
424class BaseAdminNetworkTest(BaseNetworkTest):
425
426 credentials = ['primary', 'admin']
427
428 @classmethod
429 def setup_clients(cls):
430 super(BaseAdminNetworkTest, cls).setup_clients()
431 cls.admin_client = cls.os_adm.network_client
432 cls.identity_admin_client = cls.os_adm.tenants_client
nanaboatedfe7742017-07-14 22:26:52 +0000433 cls.identity_admin_clientv3 = cls.os_admin.projects_client
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000434
435 @classmethod
436 def create_metering_label(cls, name, description):
437 """Wrapper utility that returns a test metering label."""
438 body = cls.admin_client.create_metering_label(
439 description=description,
440 name=data_utils.rand_name("metering-label"))
441 metering_label = body['metering_label']
442 cls.metering_labels.append(metering_label)
443 return metering_label
444
445 @classmethod
446 def create_metering_label_rule(cls, remote_ip_prefix, direction,
447 metering_label_id):
448 """Wrapper utility that returns a test metering label rule."""
449 body = cls.admin_client.create_metering_label_rule(
450 remote_ip_prefix=remote_ip_prefix, direction=direction,
451 metering_label_id=metering_label_id)
452 metering_label_rule = body['metering_label_rule']
453 cls.metering_label_rules.append(metering_label_rule)
454 return metering_label_rule
455
456 @classmethod
457 def create_flavor(cls, name, description, service_type):
458 """Wrapper utility that returns a test flavor."""
459 body = cls.admin_client.create_flavor(
460 description=description, service_type=service_type,
461 name=name)
462 flavor = body['flavor']
463 cls.flavors.append(flavor)
464 return flavor
465
466 @classmethod
467 def create_service_profile(cls, description, metainfo, driver):
468 """Wrapper utility that returns a test service profile."""
469 body = cls.admin_client.create_service_profile(
470 driver=driver, metainfo=metainfo, description=description)
471 service_profile = body['service_profile']
472 cls.service_profiles.append(service_profile)
473 return service_profile
474
475 @classmethod
476 def get_unused_ip(cls, net_id, ip_version=None):
Gary Kotton011345f2016-06-15 08:04:31 -0700477 """Get an unused ip address in a allocation pool of net"""
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000478 body = cls.admin_client.list_ports(network_id=net_id)
479 ports = body['ports']
480 used_ips = []
481 for port in ports:
482 used_ips.extend(
483 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
484 body = cls.admin_client.list_subnets(network_id=net_id)
485 subnets = body['subnets']
486
487 for subnet in subnets:
488 if ip_version and subnet['ip_version'] != ip_version:
489 continue
490 cidr = subnet['cidr']
491 allocation_pools = subnet['allocation_pools']
492 iterators = []
493 if allocation_pools:
494 for allocation_pool in allocation_pools:
495 iterators.append(netaddr.iter_iprange(
496 allocation_pool['start'], allocation_pool['end']))
497 else:
498 net = netaddr.IPNetwork(cidr)
499
500 def _iterip():
501 for ip in net:
502 if ip not in (net.network, net.broadcast):
503 yield ip
504 iterators.append(iter(_iterip()))
505
506 for iterator in iterators:
507 for ip in iterator:
508 if str(ip) not in used_ips:
509 return str(ip)
510
511 message = (
512 "net(%s) has no usable IP address in allocation pools" % net_id)
513 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200514
515
Sławek Kapłońskiff294062016-12-04 15:00:54 +0000516def require_qos_rule_type(rule_type):
517 def decorator(f):
518 @functools.wraps(f)
519 def wrapper(self, *func_args, **func_kwargs):
520 if rule_type not in self.get_supported_qos_rule_types():
521 raise self.skipException(
522 "%s rule type is required." % rule_type)
523 return f(self, *func_args, **func_kwargs)
524 return wrapper
525 return decorator
526
527
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200528def _require_sorting(f):
529 @functools.wraps(f)
530 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200531 if not test.is_extension_enabled("sorting", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200532 self.skipTest('Sorting feature is required')
533 return f(self, *args, **kwargs)
534 return inner
535
536
537def _require_pagination(f):
538 @functools.wraps(f)
539 def inner(self, *args, **kwargs):
Ihar Hrachyshka34feb5b2016-06-14 16:16:06 +0200540 if not test.is_extension_enabled("pagination", "network"):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200541 self.skipTest('Pagination feature is required')
542 return f(self, *args, **kwargs)
543 return inner
544
545
546class BaseSearchCriteriaTest(BaseNetworkTest):
547
548 # This should be defined by subclasses to reflect resource name to test
549 resource = None
550
Armando Migliaccio57581c62016-07-01 10:13:19 -0700551 field = 'name'
552
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200553 # NOTE(ihrachys): some names, like those starting with an underscore (_)
554 # are sorted differently depending on whether the plugin implements native
555 # sorting support, or not. So we avoid any such cases here, sticking to
556 # alphanumeric. Also test a case when there are multiple resources with the
557 # same name
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200558 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
559
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200560 force_tenant_isolation = True
561
Ihar Hrachyshkaa8fe5a12016-05-24 14:50:58 +0200562 list_kwargs = {}
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200563
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200564 list_as_admin = False
565
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200566 def assertSameOrder(self, original, actual):
567 # gracefully handle iterators passed
568 original = list(original)
569 actual = list(actual)
570 self.assertEqual(len(original), len(actual))
571 for expected, res in zip(original, actual):
Armando Migliaccio57581c62016-07-01 10:13:19 -0700572 self.assertEqual(expected[self.field], res[self.field])
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200573
574 @utils.classproperty
575 def plural_name(self):
576 return '%ss' % self.resource
577
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200578 @property
579 def list_client(self):
580 return self.admin_client if self.list_as_admin else self.client
581
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200582 def list_method(self, *args, **kwargs):
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200583 method = getattr(self.list_client, 'list_%s' % self.plural_name)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200584 kwargs.update(self.list_kwargs)
585 return method(*args, **kwargs)
586
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200587 def get_bare_url(self, url):
588 base_url = self.client.base_url
589 self.assertTrue(url.startswith(base_url))
590 return url[len(base_url):]
591
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200592 @classmethod
593 def _extract_resources(cls, body):
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200594 return body[cls.plural_name]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200595
596 def _test_list_sorts(self, direction):
597 sort_args = {
598 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700599 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200600 }
601 body = self.list_method(**sort_args)
602 resources = self._extract_resources(body)
603 self.assertNotEmpty(
604 resources, "%s list returned is empty" % self.resource)
Armando Migliaccio57581c62016-07-01 10:13:19 -0700605 retrieved_names = [res[self.field] for res in resources]
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200606 expected = sorted(retrieved_names)
607 if direction == constants.SORT_DIRECTION_DESC:
608 expected = list(reversed(expected))
609 self.assertEqual(expected, retrieved_names)
610
611 @_require_sorting
612 def _test_list_sorts_asc(self):
613 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
614
615 @_require_sorting
616 def _test_list_sorts_desc(self):
617 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
618
619 @_require_pagination
620 def _test_list_pagination(self):
621 for limit in range(1, len(self.resource_names) + 1):
622 pagination_args = {
623 'limit': limit,
624 }
625 body = self.list_method(**pagination_args)
626 resources = self._extract_resources(body)
627 self.assertEqual(limit, len(resources))
628
629 @_require_pagination
630 def _test_list_no_pagination_limit_0(self):
631 pagination_args = {
632 'limit': 0,
633 }
634 body = self.list_method(**pagination_args)
635 resources = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200636 self.assertGreaterEqual(len(resources), len(self.resource_names))
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200637
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200638 def _test_list_pagination_iteratively(self, lister):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200639 # first, collect all resources for later comparison
640 sort_args = {
641 'sort_dir': constants.SORT_DIRECTION_ASC,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700642 'sort_key': self.field
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200643 }
644 body = self.list_method(**sort_args)
645 expected_resources = self._extract_resources(body)
646 self.assertNotEmpty(expected_resources)
647
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200648 resources = lister(
649 len(expected_resources), sort_args
650 )
651
652 # finally, compare that the list retrieved in one go is identical to
653 # the one containing pagination results
654 self.assertSameOrder(expected_resources, resources)
655
656 def _list_all_with_marker(self, niterations, sort_args):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200657 # paginate resources one by one, using last fetched resource as a
658 # marker
659 resources = []
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200660 for i in range(niterations):
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200661 pagination_args = sort_args.copy()
662 pagination_args['limit'] = 1
663 if resources:
664 pagination_args['marker'] = resources[-1]['id']
665 body = self.list_method(**pagination_args)
666 resources_ = self._extract_resources(body)
667 self.assertEqual(1, len(resources_))
668 resources.extend(resources_)
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200669 return resources
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200670
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200671 @_require_pagination
672 @_require_sorting
673 def _test_list_pagination_with_marker(self):
674 self._test_list_pagination_iteratively(self._list_all_with_marker)
675
676 def _list_all_with_hrefs(self, niterations, sort_args):
677 # paginate resources one by one, using next href links
678 resources = []
679 prev_links = {}
680
681 for i in range(niterations):
682 if prev_links:
683 uri = self.get_bare_url(prev_links['next'])
684 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200685 sort_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200686 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200687 self.plural_name, limit=1, **sort_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200688 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200689 self.plural_name, uri
690 )
691 resources_ = self._extract_resources(body)
692 self.assertEqual(1, len(resources_))
693 resources.extend(resources_)
694
695 # The last element is empty and does not contain 'next' link
696 uri = self.get_bare_url(prev_links['next'])
697 prev_links, body = self.client.get_uri_with_links(
698 self.plural_name, uri
699 )
700 self.assertNotIn('next', prev_links)
701
702 # Now walk backwards and compare results
703 resources2 = []
704 for i in range(niterations):
705 uri = self.get_bare_url(prev_links['previous'])
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200706 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200707 self.plural_name, uri
708 )
709 resources_ = self._extract_resources(body)
710 self.assertEqual(1, len(resources_))
711 resources2.extend(resources_)
712
713 self.assertSameOrder(resources, reversed(resources2))
714
715 return resources
716
717 @_require_pagination
718 @_require_sorting
719 def _test_list_pagination_with_href_links(self):
720 self._test_list_pagination_iteratively(self._list_all_with_hrefs)
721
722 @_require_pagination
723 @_require_sorting
724 def _test_list_pagination_page_reverse_with_href_links(
725 self, direction=constants.SORT_DIRECTION_ASC):
726 pagination_args = {
727 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700728 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200729 }
730 body = self.list_method(**pagination_args)
731 expected_resources = self._extract_resources(body)
732
733 page_size = 2
734 pagination_args['limit'] = page_size
735
736 prev_links = {}
737 resources = []
738 num_resources = len(expected_resources)
739 niterations = int(math.ceil(float(num_resources) / page_size))
740 for i in range(niterations):
741 if prev_links:
742 uri = self.get_bare_url(prev_links['previous'])
743 else:
Ihar Hrachyshka7f79fe62016-06-07 21:23:44 +0200744 pagination_args.update(self.list_kwargs)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200745 uri = self.list_client.build_uri(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200746 self.plural_name, page_reverse=True, **pagination_args)
Ihar Hrachyshkab7940d92016-06-10 13:44:22 +0200747 prev_links, body = self.list_client.get_uri_with_links(
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200748 self.plural_name, uri
749 )
750 resources_ = self._extract_resources(body)
Béla Vancsicsf1806182016-08-23 07:36:18 +0200751 self.assertGreaterEqual(page_size, len(resources_))
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200752 resources.extend(reversed(resources_))
753
754 self.assertSameOrder(expected_resources, reversed(resources))
755
756 @_require_pagination
757 @_require_sorting
758 def _test_list_pagination_page_reverse_asc(self):
759 self._test_list_pagination_page_reverse(
760 direction=constants.SORT_DIRECTION_ASC)
761
762 @_require_pagination
763 @_require_sorting
764 def _test_list_pagination_page_reverse_desc(self):
765 self._test_list_pagination_page_reverse(
766 direction=constants.SORT_DIRECTION_DESC)
767
768 def _test_list_pagination_page_reverse(self, direction):
769 pagination_args = {
770 'sort_dir': direction,
Armando Migliaccio57581c62016-07-01 10:13:19 -0700771 'sort_key': self.field,
Ihar Hrachyshkaaeb03a02016-05-18 20:03:18 +0200772 'limit': 3,
773 }
774 body = self.list_method(**pagination_args)
775 expected_resources = self._extract_resources(body)
776
777 pagination_args['limit'] -= 1
778 pagination_args['marker'] = expected_resources[-1]['id']
779 pagination_args['page_reverse'] = True
780 body = self.list_method(**pagination_args)
781
782 self.assertSameOrder(
783 # the last entry is not included in 2nd result when used as a
784 # marker
785 expected_resources[:-1],
786 self._extract_resources(body))
Victor Morales1be97b42016-09-05 08:50:06 -0500787
788 def _test_list_validation_filters(self):
789 validation_args = {
790 'unknown_filter': 'value',
791 }
792 body = self.list_method(**validation_args)
793 resources = self._extract_resources(body)
794 for resource in resources:
795 self.assertIn(resource['name'], self.resource_names)