blob: 033ac67df73c51dae183382aba016da3aa55c186 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Ihar Hrachyshka59382252016-04-05 15:54:33 +020016import functools
17
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000018import netaddr
19from tempest.lib.common.utils import data_utils
20from tempest.lib import exceptions as lib_exc
21from tempest import test
22
Ihar Hrachyshka59382252016-04-05 15:54:33 +020023from neutron.common import constants
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000024from neutron.tests.tempest.api import clients
25from neutron.tests.tempest import config
26from neutron.tests.tempest import exceptions
27
28CONF = config.CONF
29
30
31class BaseNetworkTest(test.BaseTestCase):
32
33 """
34 Base class for the Neutron tests that use the Tempest Neutron REST client
35
36 Per the Neutron API Guide, API v1.x was removed from the source code tree
37 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
38 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
39 following options are defined in the [network] section of etc/tempest.conf:
40
41 project_network_cidr with a block of cidr's from which smaller blocks
42 can be allocated for tenant networks
43
44 project_network_mask_bits with the mask bits to be used to partition
45 the block defined by tenant-network_cidr
46
47 Finally, it is assumed that the following option is defined in the
48 [service_available] section of etc/tempest.conf
49
50 neutron as True
51 """
52
53 force_tenant_isolation = False
54 credentials = ['primary']
55
56 # Default to ipv4.
57 _ip_version = 4
58
59 @classmethod
60 def get_client_manager(cls, credential_type=None, roles=None,
61 force_new=None):
62 manager = test.BaseTestCase.get_client_manager(
63 credential_type=credential_type,
64 roles=roles,
65 force_new=force_new)
66 # Neutron uses a different clients manager than the one in the Tempest
67 return clients.Manager(manager.credentials)
68
69 @classmethod
70 def skip_checks(cls):
71 super(BaseNetworkTest, cls).skip_checks()
72 if not CONF.service_available.neutron:
73 raise cls.skipException("Neutron support is required")
74 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
75 raise cls.skipException("IPv6 Tests are disabled.")
76
77 @classmethod
78 def setup_credentials(cls):
79 # Create no network resources for these test.
80 cls.set_network_resources()
81 super(BaseNetworkTest, cls).setup_credentials()
82
83 @classmethod
84 def setup_clients(cls):
85 super(BaseNetworkTest, cls).setup_clients()
86 cls.client = cls.os.network_client
87
88 @classmethod
89 def resource_setup(cls):
90 super(BaseNetworkTest, cls).resource_setup()
91
92 cls.networks = []
93 cls.shared_networks = []
94 cls.subnets = []
95 cls.ports = []
96 cls.routers = []
97 cls.floating_ips = []
98 cls.metering_labels = []
99 cls.service_profiles = []
100 cls.flavors = []
101 cls.metering_label_rules = []
102 cls.qos_rules = []
103 cls.qos_policies = []
104 cls.ethertype = "IPv" + str(cls._ip_version)
105 cls.address_scopes = []
106 cls.admin_address_scopes = []
107 cls.subnetpools = []
108 cls.admin_subnetpools = []
109
110 @classmethod
111 def resource_cleanup(cls):
112 if CONF.service_available.neutron:
113 # Clean up QoS rules
114 for qos_rule in cls.qos_rules:
115 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
116 qos_rule['id'])
117 # Clean up QoS policies
118 for qos_policy in cls.qos_policies:
119 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
120 qos_policy['id'])
121 # Clean up floating IPs
122 for floating_ip in cls.floating_ips:
123 cls._try_delete_resource(cls.client.delete_floatingip,
124 floating_ip['id'])
125 # Clean up routers
126 for router in cls.routers:
127 cls._try_delete_resource(cls.delete_router,
128 router)
129 # Clean up metering label rules
130 for metering_label_rule in cls.metering_label_rules:
131 cls._try_delete_resource(
132 cls.admin_client.delete_metering_label_rule,
133 metering_label_rule['id'])
134 # Clean up metering labels
135 for metering_label in cls.metering_labels:
136 cls._try_delete_resource(
137 cls.admin_client.delete_metering_label,
138 metering_label['id'])
139 # Clean up flavors
140 for flavor in cls.flavors:
141 cls._try_delete_resource(
142 cls.admin_client.delete_flavor,
143 flavor['id'])
144 # Clean up service profiles
145 for service_profile in cls.service_profiles:
146 cls._try_delete_resource(
147 cls.admin_client.delete_service_profile,
148 service_profile['id'])
149 # Clean up ports
150 for port in cls.ports:
151 cls._try_delete_resource(cls.client.delete_port,
152 port['id'])
153 # Clean up subnets
154 for subnet in cls.subnets:
155 cls._try_delete_resource(cls.client.delete_subnet,
156 subnet['id'])
157 # Clean up networks
158 for network in cls.networks:
159 cls._try_delete_resource(cls.client.delete_network,
160 network['id'])
161
162 # Clean up shared networks
163 for network in cls.shared_networks:
164 cls._try_delete_resource(cls.admin_client.delete_network,
165 network['id'])
166
167 for subnetpool in cls.subnetpools:
168 cls._try_delete_resource(cls.client.delete_subnetpool,
169 subnetpool['id'])
170
171 for subnetpool in cls.admin_subnetpools:
172 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
173 subnetpool['id'])
174
175 for address_scope in cls.address_scopes:
176 cls._try_delete_resource(cls.client.delete_address_scope,
177 address_scope['id'])
178
179 for address_scope in cls.admin_address_scopes:
180 cls._try_delete_resource(
181 cls.admin_client.delete_address_scope,
182 address_scope['id'])
183
184 super(BaseNetworkTest, cls).resource_cleanup()
185
186 @classmethod
187 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
188 """Cleanup resources in case of test-failure
189
190 Some resources are explicitly deleted by the test.
191 If the test failed to delete a resource, this method will execute
192 the appropriate delete methods. Otherwise, the method ignores NotFound
193 exceptions thrown for resources that were correctly deleted by the
194 test.
195
196 :param delete_callable: delete method
197 :param args: arguments for delete method
198 :param kwargs: keyword arguments for delete method
199 """
200 try:
201 delete_callable(*args, **kwargs)
202 # if resource is not found, this means it was deleted in the test
203 except lib_exc.NotFound:
204 pass
205
206 @classmethod
207 def create_network(cls, network_name=None, **kwargs):
208 """Wrapper utility that returns a test network."""
209 network_name = network_name or data_utils.rand_name('test-network-')
210
211 body = cls.client.create_network(name=network_name, **kwargs)
212 network = body['network']
213 cls.networks.append(network)
214 return network
215
216 @classmethod
217 def create_shared_network(cls, network_name=None, **post_body):
218 network_name = network_name or data_utils.rand_name('sharednetwork-')
219 post_body.update({'name': network_name, 'shared': True})
220 body = cls.admin_client.create_network(**post_body)
221 network = body['network']
222 cls.shared_networks.append(network)
223 return network
224
225 @classmethod
226 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
227 ip_version=None, client=None, **kwargs):
228 """Wrapper utility that returns a test subnet."""
229
230 # allow tests to use admin client
231 if not client:
232 client = cls.client
233
234 # The cidr and mask_bits depend on the ip version.
235 ip_version = ip_version if ip_version is not None else cls._ip_version
236 gateway_not_set = gateway == ''
237 if ip_version == 4:
238 cidr = cidr or netaddr.IPNetwork(
239 config.safe_get_config_value(
240 'network', 'project_network_cidr'))
241 mask_bits = (
242 mask_bits or config.safe_get_config_value(
243 'network', 'project_network_mask_bits'))
244 elif ip_version == 6:
245 cidr = (
246 cidr or netaddr.IPNetwork(
247 config.safe_get_config_value(
248 'network', 'project_network_v6_cidr')))
249 mask_bits = (
250 mask_bits or config.safe_get_config_value(
251 'network', 'project_network_v6_mask_bits'))
252 # Find a cidr that is not in use yet and create a subnet with it
253 for subnet_cidr in cidr.subnet(mask_bits):
254 if gateway_not_set:
255 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
256 else:
257 gateway_ip = gateway
258 try:
259 body = client.create_subnet(
260 network_id=network['id'],
261 cidr=str(subnet_cidr),
262 ip_version=ip_version,
263 gateway_ip=gateway_ip,
264 **kwargs)
265 break
266 except lib_exc.BadRequest as e:
267 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
268 if not is_overlapping_cidr:
269 raise
270 else:
271 message = 'Available CIDR for subnet creation could not be found'
272 raise ValueError(message)
273 subnet = body['subnet']
274 cls.subnets.append(subnet)
275 return subnet
276
277 @classmethod
278 def create_port(cls, network, **kwargs):
279 """Wrapper utility that returns a test port."""
280 body = cls.client.create_port(network_id=network['id'],
281 **kwargs)
282 port = body['port']
283 cls.ports.append(port)
284 return port
285
286 @classmethod
287 def update_port(cls, port, **kwargs):
288 """Wrapper utility that updates a test port."""
289 body = cls.client.update_port(port['id'],
290 **kwargs)
291 return body['port']
292
293 @classmethod
294 def create_router(cls, router_name=None, admin_state_up=False,
295 external_network_id=None, enable_snat=None,
296 **kwargs):
297 ext_gw_info = {}
298 if external_network_id:
299 ext_gw_info['network_id'] = external_network_id
300 if enable_snat:
301 ext_gw_info['enable_snat'] = enable_snat
302 body = cls.client.create_router(
303 router_name, external_gateway_info=ext_gw_info,
304 admin_state_up=admin_state_up, **kwargs)
305 router = body['router']
306 cls.routers.append(router)
307 return router
308
309 @classmethod
310 def create_floatingip(cls, external_network_id):
311 """Wrapper utility that returns a test floating IP."""
312 body = cls.client.create_floatingip(
313 floating_network_id=external_network_id)
314 fip = body['floatingip']
315 cls.floating_ips.append(fip)
316 return fip
317
318 @classmethod
319 def create_router_interface(cls, router_id, subnet_id):
320 """Wrapper utility that returns a router interface."""
321 interface = cls.client.add_router_interface_with_subnet_id(
322 router_id, subnet_id)
323 return interface
324
325 @classmethod
326 def create_qos_policy(cls, name, description, shared, tenant_id=None):
327 """Wrapper utility that returns a test QoS policy."""
328 body = cls.admin_client.create_qos_policy(
329 name, description, shared, tenant_id)
330 qos_policy = body['policy']
331 cls.qos_policies.append(qos_policy)
332 return qos_policy
333
334 @classmethod
335 def create_qos_bandwidth_limit_rule(cls, policy_id,
336 max_kbps, max_burst_kbps):
337 """Wrapper utility that returns a test QoS bandwidth limit rule."""
338 body = cls.admin_client.create_bandwidth_limit_rule(
339 policy_id, max_kbps, max_burst_kbps)
340 qos_rule = body['bandwidth_limit_rule']
341 cls.qos_rules.append(qos_rule)
342 return qos_rule
343
344 @classmethod
345 def delete_router(cls, router):
346 body = cls.client.list_router_interfaces(router['id'])
347 interfaces = body['ports']
348 for i in interfaces:
349 try:
350 cls.client.remove_router_interface_with_subnet_id(
351 router['id'], i['fixed_ips'][0]['subnet_id'])
352 except lib_exc.NotFound:
353 pass
354 cls.client.delete_router(router['id'])
355
356 @classmethod
357 def create_address_scope(cls, name, is_admin=False, **kwargs):
358 if is_admin:
359 body = cls.admin_client.create_address_scope(name=name, **kwargs)
360 cls.admin_address_scopes.append(body['address_scope'])
361 else:
362 body = cls.client.create_address_scope(name=name, **kwargs)
363 cls.address_scopes.append(body['address_scope'])
364 return body['address_scope']
365
366 @classmethod
367 def create_subnetpool(cls, name, is_admin=False, **kwargs):
368 if is_admin:
369 body = cls.admin_client.create_subnetpool(name, **kwargs)
370 cls.admin_subnetpools.append(body['subnetpool'])
371 else:
372 body = cls.client.create_subnetpool(name, **kwargs)
373 cls.subnetpools.append(body['subnetpool'])
374 return body['subnetpool']
375
376
377class BaseAdminNetworkTest(BaseNetworkTest):
378
379 credentials = ['primary', 'admin']
380
381 @classmethod
382 def setup_clients(cls):
383 super(BaseAdminNetworkTest, cls).setup_clients()
384 cls.admin_client = cls.os_adm.network_client
385 cls.identity_admin_client = cls.os_adm.tenants_client
386
387 @classmethod
388 def create_metering_label(cls, name, description):
389 """Wrapper utility that returns a test metering label."""
390 body = cls.admin_client.create_metering_label(
391 description=description,
392 name=data_utils.rand_name("metering-label"))
393 metering_label = body['metering_label']
394 cls.metering_labels.append(metering_label)
395 return metering_label
396
397 @classmethod
398 def create_metering_label_rule(cls, remote_ip_prefix, direction,
399 metering_label_id):
400 """Wrapper utility that returns a test metering label rule."""
401 body = cls.admin_client.create_metering_label_rule(
402 remote_ip_prefix=remote_ip_prefix, direction=direction,
403 metering_label_id=metering_label_id)
404 metering_label_rule = body['metering_label_rule']
405 cls.metering_label_rules.append(metering_label_rule)
406 return metering_label_rule
407
408 @classmethod
409 def create_flavor(cls, name, description, service_type):
410 """Wrapper utility that returns a test flavor."""
411 body = cls.admin_client.create_flavor(
412 description=description, service_type=service_type,
413 name=name)
414 flavor = body['flavor']
415 cls.flavors.append(flavor)
416 return flavor
417
418 @classmethod
419 def create_service_profile(cls, description, metainfo, driver):
420 """Wrapper utility that returns a test service profile."""
421 body = cls.admin_client.create_service_profile(
422 driver=driver, metainfo=metainfo, description=description)
423 service_profile = body['service_profile']
424 cls.service_profiles.append(service_profile)
425 return service_profile
426
427 @classmethod
428 def get_unused_ip(cls, net_id, ip_version=None):
429 """Get an unused ip address in a allocaion pool of net"""
430 body = cls.admin_client.list_ports(network_id=net_id)
431 ports = body['ports']
432 used_ips = []
433 for port in ports:
434 used_ips.extend(
435 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
436 body = cls.admin_client.list_subnets(network_id=net_id)
437 subnets = body['subnets']
438
439 for subnet in subnets:
440 if ip_version and subnet['ip_version'] != ip_version:
441 continue
442 cidr = subnet['cidr']
443 allocation_pools = subnet['allocation_pools']
444 iterators = []
445 if allocation_pools:
446 for allocation_pool in allocation_pools:
447 iterators.append(netaddr.iter_iprange(
448 allocation_pool['start'], allocation_pool['end']))
449 else:
450 net = netaddr.IPNetwork(cidr)
451
452 def _iterip():
453 for ip in net:
454 if ip not in (net.network, net.broadcast):
455 yield ip
456 iterators.append(iter(_iterip()))
457
458 for iterator in iterators:
459 for ip in iterator:
460 if str(ip) not in used_ips:
461 return str(ip)
462
463 message = (
464 "net(%s) has no usable IP address in allocation pools" % net_id)
465 raise exceptions.InvalidConfiguration(message)
Ihar Hrachyshka59382252016-04-05 15:54:33 +0200466
467
468def _require_sorting(f):
469 @functools.wraps(f)
470 def inner(self, *args, **kwargs):
471 if not CONF.neutron_plugin_options.validate_sorting:
472 self.skipTest('Sorting feature is required')
473 return f(self, *args, **kwargs)
474 return inner
475
476
477def _require_pagination(f):
478 @functools.wraps(f)
479 def inner(self, *args, **kwargs):
480 if not CONF.neutron_plugin_options.validate_pagination:
481 self.skipTest('Pagination feature is required')
482 return f(self, *args, **kwargs)
483 return inner
484
485
486class BaseSearchCriteriaTest(BaseNetworkTest):
487
488 # This should be defined by subclasses to reflect resource name to test
489 resource = None
490
491 # also test a case when there are multiple resources with the same name
492 resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
493
494 list_kwargs = {'shared': False}
495
496 force_tenant_isolation = True
497
498 @classmethod
499 def resource_setup(cls):
500 super(BaseSearchCriteriaTest, cls).resource_setup()
501
502 cls.create_method = getattr(cls, 'create_%s' % cls.resource)
503
504 # NOTE(ihrachys): some names, like those starting with an underscore
505 # (_) are sorted differently depending on whether the plugin implements
506 # native sorting support, or not. So we avoid any such cases here,
507 # sticking to alphanumeric.
508 for name in cls.resource_names:
509 args = {'%s_name' % cls.resource: name}
510 cls.create_method(**args)
511
512 def list_method(self, *args, **kwargs):
513 method = getattr(self.client, 'list_%ss' % self.resource)
514 kwargs.update(self.list_kwargs)
515 return method(*args, **kwargs)
516
517 @classmethod
518 def _extract_resources(cls, body):
519 return body['%ss' % cls.resource]
520
521 def _test_list_sorts(self, direction):
522 sort_args = {
523 'sort_dir': direction,
524 'sort_key': 'name'
525 }
526 body = self.list_method(**sort_args)
527 resources = self._extract_resources(body)
528 self.assertNotEmpty(
529 resources, "%s list returned is empty" % self.resource)
530 retrieved_names = [res['name'] for res in resources]
531 expected = sorted(retrieved_names)
532 if direction == constants.SORT_DIRECTION_DESC:
533 expected = list(reversed(expected))
534 self.assertEqual(expected, retrieved_names)
535
536 @_require_sorting
537 def _test_list_sorts_asc(self):
538 self._test_list_sorts(constants.SORT_DIRECTION_ASC)
539
540 @_require_sorting
541 def _test_list_sorts_desc(self):
542 self._test_list_sorts(constants.SORT_DIRECTION_DESC)
543
544 @_require_pagination
545 def _test_list_pagination(self):
546 for limit in range(1, len(self.resource_names) + 1):
547 pagination_args = {
548 'limit': limit,
549 }
550 body = self.list_method(**pagination_args)
551 resources = self._extract_resources(body)
552 self.assertEqual(limit, len(resources))
553
554 @_require_pagination
555 def _test_list_no_pagination_limit_0(self):
556 pagination_args = {
557 'limit': 0,
558 }
559 body = self.list_method(**pagination_args)
560 resources = self._extract_resources(body)
561 self.assertTrue(len(resources) >= len(self.resource_names))
562
563 @_require_pagination
564 @_require_sorting
565 def _test_list_pagination_with_marker(self):
566 # first, collect all resources for later comparison
567 sort_args = {
568 'sort_dir': constants.SORT_DIRECTION_ASC,
569 'sort_key': 'name'
570 }
571 body = self.list_method(**sort_args)
572 expected_resources = self._extract_resources(body)
573 self.assertNotEmpty(expected_resources)
574
575 # paginate resources one by one, using last fetched resource as a
576 # marker
577 resources = []
578 for i in range(len(expected_resources)):
579 pagination_args = sort_args.copy()
580 pagination_args['limit'] = 1
581 if resources:
582 pagination_args['marker'] = resources[-1]['id']
583 body = self.list_method(**pagination_args)
584 resources_ = self._extract_resources(body)
585 self.assertEqual(1, len(resources_))
586 resources.extend(resources_)
587
588 # finally, compare that the list retrieved in one go is identical to
589 # the one containing pagination results
590 for expected, res in zip(expected_resources, resources):
591 self.assertEqual(expected['name'], res['name'])