blob: c1bd622953197462b0dfab1a261d2cd66bda8039 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2012 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import netaddr
17from tempest.lib.common.utils import data_utils
18from tempest.lib import exceptions as lib_exc
19from tempest import test
20
21from neutron.tests.tempest.api import clients
22from neutron.tests.tempest import config
23from neutron.tests.tempest import exceptions
24
25CONF = config.CONF
26
27
28class BaseNetworkTest(test.BaseTestCase):
29
30 """
31 Base class for the Neutron tests that use the Tempest Neutron REST client
32
33 Per the Neutron API Guide, API v1.x was removed from the source code tree
34 (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
35 Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
36 following options are defined in the [network] section of etc/tempest.conf:
37
38 project_network_cidr with a block of cidr's from which smaller blocks
39 can be allocated for tenant networks
40
41 project_network_mask_bits with the mask bits to be used to partition
42 the block defined by tenant-network_cidr
43
44 Finally, it is assumed that the following option is defined in the
45 [service_available] section of etc/tempest.conf
46
47 neutron as True
48 """
49
50 force_tenant_isolation = False
51 credentials = ['primary']
52
53 # Default to ipv4.
54 _ip_version = 4
55
56 @classmethod
57 def get_client_manager(cls, credential_type=None, roles=None,
58 force_new=None):
59 manager = test.BaseTestCase.get_client_manager(
60 credential_type=credential_type,
61 roles=roles,
62 force_new=force_new)
63 # Neutron uses a different clients manager than the one in the Tempest
64 return clients.Manager(manager.credentials)
65
66 @classmethod
67 def skip_checks(cls):
68 super(BaseNetworkTest, cls).skip_checks()
69 if not CONF.service_available.neutron:
70 raise cls.skipException("Neutron support is required")
71 if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
72 raise cls.skipException("IPv6 Tests are disabled.")
73
74 @classmethod
75 def setup_credentials(cls):
76 # Create no network resources for these test.
77 cls.set_network_resources()
78 super(BaseNetworkTest, cls).setup_credentials()
79
80 @classmethod
81 def setup_clients(cls):
82 super(BaseNetworkTest, cls).setup_clients()
83 cls.client = cls.os.network_client
84
85 @classmethod
86 def resource_setup(cls):
87 super(BaseNetworkTest, cls).resource_setup()
88
89 cls.networks = []
90 cls.shared_networks = []
91 cls.subnets = []
92 cls.ports = []
93 cls.routers = []
94 cls.floating_ips = []
95 cls.metering_labels = []
96 cls.service_profiles = []
97 cls.flavors = []
98 cls.metering_label_rules = []
99 cls.qos_rules = []
100 cls.qos_policies = []
101 cls.ethertype = "IPv" + str(cls._ip_version)
102 cls.address_scopes = []
103 cls.admin_address_scopes = []
104 cls.subnetpools = []
105 cls.admin_subnetpools = []
106
107 @classmethod
108 def resource_cleanup(cls):
109 if CONF.service_available.neutron:
110 # Clean up QoS rules
111 for qos_rule in cls.qos_rules:
112 cls._try_delete_resource(cls.admin_client.delete_qos_rule,
113 qos_rule['id'])
114 # Clean up QoS policies
115 for qos_policy in cls.qos_policies:
116 cls._try_delete_resource(cls.admin_client.delete_qos_policy,
117 qos_policy['id'])
118 # Clean up floating IPs
119 for floating_ip in cls.floating_ips:
120 cls._try_delete_resource(cls.client.delete_floatingip,
121 floating_ip['id'])
122 # Clean up routers
123 for router in cls.routers:
124 cls._try_delete_resource(cls.delete_router,
125 router)
126 # Clean up metering label rules
127 for metering_label_rule in cls.metering_label_rules:
128 cls._try_delete_resource(
129 cls.admin_client.delete_metering_label_rule,
130 metering_label_rule['id'])
131 # Clean up metering labels
132 for metering_label in cls.metering_labels:
133 cls._try_delete_resource(
134 cls.admin_client.delete_metering_label,
135 metering_label['id'])
136 # Clean up flavors
137 for flavor in cls.flavors:
138 cls._try_delete_resource(
139 cls.admin_client.delete_flavor,
140 flavor['id'])
141 # Clean up service profiles
142 for service_profile in cls.service_profiles:
143 cls._try_delete_resource(
144 cls.admin_client.delete_service_profile,
145 service_profile['id'])
146 # Clean up ports
147 for port in cls.ports:
148 cls._try_delete_resource(cls.client.delete_port,
149 port['id'])
150 # Clean up subnets
151 for subnet in cls.subnets:
152 cls._try_delete_resource(cls.client.delete_subnet,
153 subnet['id'])
154 # Clean up networks
155 for network in cls.networks:
156 cls._try_delete_resource(cls.client.delete_network,
157 network['id'])
158
159 # Clean up shared networks
160 for network in cls.shared_networks:
161 cls._try_delete_resource(cls.admin_client.delete_network,
162 network['id'])
163
164 for subnetpool in cls.subnetpools:
165 cls._try_delete_resource(cls.client.delete_subnetpool,
166 subnetpool['id'])
167
168 for subnetpool in cls.admin_subnetpools:
169 cls._try_delete_resource(cls.admin_client.delete_subnetpool,
170 subnetpool['id'])
171
172 for address_scope in cls.address_scopes:
173 cls._try_delete_resource(cls.client.delete_address_scope,
174 address_scope['id'])
175
176 for address_scope in cls.admin_address_scopes:
177 cls._try_delete_resource(
178 cls.admin_client.delete_address_scope,
179 address_scope['id'])
180
181 super(BaseNetworkTest, cls).resource_cleanup()
182
183 @classmethod
184 def _try_delete_resource(cls, delete_callable, *args, **kwargs):
185 """Cleanup resources in case of test-failure
186
187 Some resources are explicitly deleted by the test.
188 If the test failed to delete a resource, this method will execute
189 the appropriate delete methods. Otherwise, the method ignores NotFound
190 exceptions thrown for resources that were correctly deleted by the
191 test.
192
193 :param delete_callable: delete method
194 :param args: arguments for delete method
195 :param kwargs: keyword arguments for delete method
196 """
197 try:
198 delete_callable(*args, **kwargs)
199 # if resource is not found, this means it was deleted in the test
200 except lib_exc.NotFound:
201 pass
202
203 @classmethod
204 def create_network(cls, network_name=None, **kwargs):
205 """Wrapper utility that returns a test network."""
206 network_name = network_name or data_utils.rand_name('test-network-')
207
208 body = cls.client.create_network(name=network_name, **kwargs)
209 network = body['network']
210 cls.networks.append(network)
211 return network
212
213 @classmethod
214 def create_shared_network(cls, network_name=None, **post_body):
215 network_name = network_name or data_utils.rand_name('sharednetwork-')
216 post_body.update({'name': network_name, 'shared': True})
217 body = cls.admin_client.create_network(**post_body)
218 network = body['network']
219 cls.shared_networks.append(network)
220 return network
221
222 @classmethod
223 def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
224 ip_version=None, client=None, **kwargs):
225 """Wrapper utility that returns a test subnet."""
226
227 # allow tests to use admin client
228 if not client:
229 client = cls.client
230
231 # The cidr and mask_bits depend on the ip version.
232 ip_version = ip_version if ip_version is not None else cls._ip_version
233 gateway_not_set = gateway == ''
234 if ip_version == 4:
235 cidr = cidr or netaddr.IPNetwork(
236 config.safe_get_config_value(
237 'network', 'project_network_cidr'))
238 mask_bits = (
239 mask_bits or config.safe_get_config_value(
240 'network', 'project_network_mask_bits'))
241 elif ip_version == 6:
242 cidr = (
243 cidr or netaddr.IPNetwork(
244 config.safe_get_config_value(
245 'network', 'project_network_v6_cidr')))
246 mask_bits = (
247 mask_bits or config.safe_get_config_value(
248 'network', 'project_network_v6_mask_bits'))
249 # Find a cidr that is not in use yet and create a subnet with it
250 for subnet_cidr in cidr.subnet(mask_bits):
251 if gateway_not_set:
252 gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
253 else:
254 gateway_ip = gateway
255 try:
256 body = client.create_subnet(
257 network_id=network['id'],
258 cidr=str(subnet_cidr),
259 ip_version=ip_version,
260 gateway_ip=gateway_ip,
261 **kwargs)
262 break
263 except lib_exc.BadRequest as e:
264 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
265 if not is_overlapping_cidr:
266 raise
267 else:
268 message = 'Available CIDR for subnet creation could not be found'
269 raise ValueError(message)
270 subnet = body['subnet']
271 cls.subnets.append(subnet)
272 return subnet
273
274 @classmethod
275 def create_port(cls, network, **kwargs):
276 """Wrapper utility that returns a test port."""
277 body = cls.client.create_port(network_id=network['id'],
278 **kwargs)
279 port = body['port']
280 cls.ports.append(port)
281 return port
282
283 @classmethod
284 def update_port(cls, port, **kwargs):
285 """Wrapper utility that updates a test port."""
286 body = cls.client.update_port(port['id'],
287 **kwargs)
288 return body['port']
289
290 @classmethod
291 def create_router(cls, router_name=None, admin_state_up=False,
292 external_network_id=None, enable_snat=None,
293 **kwargs):
294 ext_gw_info = {}
295 if external_network_id:
296 ext_gw_info['network_id'] = external_network_id
297 if enable_snat:
298 ext_gw_info['enable_snat'] = enable_snat
299 body = cls.client.create_router(
300 router_name, external_gateway_info=ext_gw_info,
301 admin_state_up=admin_state_up, **kwargs)
302 router = body['router']
303 cls.routers.append(router)
304 return router
305
306 @classmethod
307 def create_floatingip(cls, external_network_id):
308 """Wrapper utility that returns a test floating IP."""
309 body = cls.client.create_floatingip(
310 floating_network_id=external_network_id)
311 fip = body['floatingip']
312 cls.floating_ips.append(fip)
313 return fip
314
315 @classmethod
316 def create_router_interface(cls, router_id, subnet_id):
317 """Wrapper utility that returns a router interface."""
318 interface = cls.client.add_router_interface_with_subnet_id(
319 router_id, subnet_id)
320 return interface
321
322 @classmethod
323 def create_qos_policy(cls, name, description, shared, tenant_id=None):
324 """Wrapper utility that returns a test QoS policy."""
325 body = cls.admin_client.create_qos_policy(
326 name, description, shared, tenant_id)
327 qos_policy = body['policy']
328 cls.qos_policies.append(qos_policy)
329 return qos_policy
330
331 @classmethod
332 def create_qos_bandwidth_limit_rule(cls, policy_id,
333 max_kbps, max_burst_kbps):
334 """Wrapper utility that returns a test QoS bandwidth limit rule."""
335 body = cls.admin_client.create_bandwidth_limit_rule(
336 policy_id, max_kbps, max_burst_kbps)
337 qos_rule = body['bandwidth_limit_rule']
338 cls.qos_rules.append(qos_rule)
339 return qos_rule
340
341 @classmethod
342 def delete_router(cls, router):
343 body = cls.client.list_router_interfaces(router['id'])
344 interfaces = body['ports']
345 for i in interfaces:
346 try:
347 cls.client.remove_router_interface_with_subnet_id(
348 router['id'], i['fixed_ips'][0]['subnet_id'])
349 except lib_exc.NotFound:
350 pass
351 cls.client.delete_router(router['id'])
352
353 @classmethod
354 def create_address_scope(cls, name, is_admin=False, **kwargs):
355 if is_admin:
356 body = cls.admin_client.create_address_scope(name=name, **kwargs)
357 cls.admin_address_scopes.append(body['address_scope'])
358 else:
359 body = cls.client.create_address_scope(name=name, **kwargs)
360 cls.address_scopes.append(body['address_scope'])
361 return body['address_scope']
362
363 @classmethod
364 def create_subnetpool(cls, name, is_admin=False, **kwargs):
365 if is_admin:
366 body = cls.admin_client.create_subnetpool(name, **kwargs)
367 cls.admin_subnetpools.append(body['subnetpool'])
368 else:
369 body = cls.client.create_subnetpool(name, **kwargs)
370 cls.subnetpools.append(body['subnetpool'])
371 return body['subnetpool']
372
373
374class BaseAdminNetworkTest(BaseNetworkTest):
375
376 credentials = ['primary', 'admin']
377
378 @classmethod
379 def setup_clients(cls):
380 super(BaseAdminNetworkTest, cls).setup_clients()
381 cls.admin_client = cls.os_adm.network_client
382 cls.identity_admin_client = cls.os_adm.tenants_client
383
384 @classmethod
385 def create_metering_label(cls, name, description):
386 """Wrapper utility that returns a test metering label."""
387 body = cls.admin_client.create_metering_label(
388 description=description,
389 name=data_utils.rand_name("metering-label"))
390 metering_label = body['metering_label']
391 cls.metering_labels.append(metering_label)
392 return metering_label
393
394 @classmethod
395 def create_metering_label_rule(cls, remote_ip_prefix, direction,
396 metering_label_id):
397 """Wrapper utility that returns a test metering label rule."""
398 body = cls.admin_client.create_metering_label_rule(
399 remote_ip_prefix=remote_ip_prefix, direction=direction,
400 metering_label_id=metering_label_id)
401 metering_label_rule = body['metering_label_rule']
402 cls.metering_label_rules.append(metering_label_rule)
403 return metering_label_rule
404
405 @classmethod
406 def create_flavor(cls, name, description, service_type):
407 """Wrapper utility that returns a test flavor."""
408 body = cls.admin_client.create_flavor(
409 description=description, service_type=service_type,
410 name=name)
411 flavor = body['flavor']
412 cls.flavors.append(flavor)
413 return flavor
414
415 @classmethod
416 def create_service_profile(cls, description, metainfo, driver):
417 """Wrapper utility that returns a test service profile."""
418 body = cls.admin_client.create_service_profile(
419 driver=driver, metainfo=metainfo, description=description)
420 service_profile = body['service_profile']
421 cls.service_profiles.append(service_profile)
422 return service_profile
423
424 @classmethod
425 def get_unused_ip(cls, net_id, ip_version=None):
426 """Get an unused ip address in a allocaion pool of net"""
427 body = cls.admin_client.list_ports(network_id=net_id)
428 ports = body['ports']
429 used_ips = []
430 for port in ports:
431 used_ips.extend(
432 [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
433 body = cls.admin_client.list_subnets(network_id=net_id)
434 subnets = body['subnets']
435
436 for subnet in subnets:
437 if ip_version and subnet['ip_version'] != ip_version:
438 continue
439 cidr = subnet['cidr']
440 allocation_pools = subnet['allocation_pools']
441 iterators = []
442 if allocation_pools:
443 for allocation_pool in allocation_pools:
444 iterators.append(netaddr.iter_iprange(
445 allocation_pool['start'], allocation_pool['end']))
446 else:
447 net = netaddr.IPNetwork(cidr)
448
449 def _iterip():
450 for ip in net:
451 if ip not in (net.network, net.broadcast):
452 yield ip
453 iterators.append(iter(_iterip()))
454
455 for iterator in iterators:
456 for ip in iterator:
457 if str(ip) not in used_ips:
458 return str(ip)
459
460 message = (
461 "net(%s) has no usable IP address in allocation pools" % net_id)
462 raise exceptions.InvalidConfiguration(message)