blob: cc8862eea95a5df206655e4556362e8c209177a4 [file] [log] [blame]
Solio Sarabia60095ff2017-02-28 18:18:26 -06001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17# NOTE(soliosg) Do not edit this file. It will only stay temporarily
18# in ironic, while QA refactors the tempest.scenario interface. This
19# file was copied from openstack/tempest/tempest/scenario/manager.py,
20# openstack/tempest commit: 82a278e88c9e9f9ba49f81c1f8dba0bca7943daf
21
22import subprocess
23
24import netaddr
25from oslo_log import log
26from oslo_serialization import jsonutils as json
27from oslo_utils import netutils
28
29from tempest.common import compute
30from tempest.common import image as common_image
31from tempest.common.utils.linux import remote_client
32from tempest.common.utils import net_utils
33from tempest.common import waiters
34from tempest import config
35from tempest import exceptions
36from tempest.lib.common.utils import data_utils
37from tempest.lib.common.utils import test_utils
38from tempest.lib import exceptions as lib_exc
39import tempest.test
40
41CONF = config.CONF
42
43LOG = log.getLogger(__name__)
44
45
46class ScenarioTest(tempest.test.BaseTestCase):
47 """Base class for scenario tests. Uses tempest own clients. """
48
49 credentials = ['primary']
50
51 @classmethod
52 def setup_clients(cls):
53 super(ScenarioTest, cls).setup_clients()
54 # Clients (in alphabetical order)
55 cls.flavors_client = cls.manager.flavors_client
56 cls.compute_floating_ips_client = (
57 cls.manager.compute_floating_ips_client)
58 if CONF.service_available.glance:
59 # Check if glance v1 is available to determine which client to use.
60 if CONF.image_feature_enabled.api_v1:
61 cls.image_client = cls.manager.image_client
62 elif CONF.image_feature_enabled.api_v2:
63 cls.image_client = cls.manager.image_client_v2
64 else:
65 raise lib_exc.InvalidConfiguration(
66 'Either api_v1 or api_v2 must be True in '
67 '[image-feature-enabled].')
68 # Compute image client
69 cls.compute_images_client = cls.manager.compute_images_client
70 cls.keypairs_client = cls.manager.keypairs_client
71 # Nova security groups client
72 cls.compute_security_groups_client = (
73 cls.manager.compute_security_groups_client)
74 cls.compute_security_group_rules_client = (
75 cls.manager.compute_security_group_rules_client)
76 cls.servers_client = cls.manager.servers_client
77 cls.interface_client = cls.manager.interfaces_client
78 # Neutron network client
79 cls.networks_client = cls.manager.networks_client
80 cls.ports_client = cls.manager.ports_client
81 cls.routers_client = cls.manager.routers_client
82 cls.subnets_client = cls.manager.subnets_client
83 cls.floating_ips_client = cls.manager.floating_ips_client
84 cls.security_groups_client = cls.manager.security_groups_client
85 cls.security_group_rules_client = (
86 cls.manager.security_group_rules_client)
87
88 if CONF.volume_feature_enabled.api_v2:
89 cls.volumes_client = cls.manager.volumes_v2_client
90 cls.snapshots_client = cls.manager.snapshots_v2_client
91 else:
92 cls.volumes_client = cls.manager.volumes_client
93 cls.snapshots_client = cls.manager.snapshots_client
94
95 # ## Test functions library
96 #
97 # The create_[resource] functions only return body and discard the
98 # resp part which is not used in scenario tests
99
100 def _create_port(self, network_id, client=None, namestart='port-quotatest',
101 **kwargs):
102 if not client:
103 client = self.ports_client
104 name = data_utils.rand_name(namestart)
105 result = client.create_port(
106 name=name,
107 network_id=network_id,
108 **kwargs)
109 self.assertIsNotNone(result, 'Unable to allocate port')
110 port = result['port']
111 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
112 client.delete_port, port['id'])
113 return port
114
115 def create_keypair(self, client=None):
116 if not client:
117 client = self.keypairs_client
118 name = data_utils.rand_name(self.__class__.__name__)
119 # We don't need to create a keypair by pubkey in scenario
120 body = client.create_keypair(name=name)
121 self.addCleanup(client.delete_keypair, name)
122 return body['keypair']
123
124 def create_server(self, name=None, image_id=None, flavor=None,
125 validatable=False, wait_until='ACTIVE',
126 clients=None, **kwargs):
127 """Wrapper utility that returns a test server.
128
129 This wrapper utility calls the common create test server and
130 returns a test server. The purpose of this wrapper is to minimize
131 the impact on the code of the tests already using this
132 function.
133 """
134
135 # NOTE(jlanoux): As a first step, ssh checks in the scenario
136 # tests need to be run regardless of the run_validation and
137 # validatable parameters and thus until the ssh validation job
138 # becomes voting in CI. The test resources management and IP
139 # association are taken care of in the scenario tests.
140 # Therefore, the validatable parameter is set to false in all
141 # those tests. In this way create_server just return a standard
142 # server and the scenario tests always perform ssh checks.
143
144 # Needed for the cross_tenant_traffic test:
145 if clients is None:
146 clients = self.manager
147
148 if name is None:
149 name = data_utils.rand_name(self.__class__.__name__ + "-server")
150
151 vnic_type = CONF.network.port_vnic_type
152
153 # If vnic_type is configured create port for
154 # every network
155 if vnic_type:
156 ports = []
157
158 create_port_body = {'binding:vnic_type': vnic_type,
159 'namestart': 'port-smoke'}
160 if kwargs:
161 # Convert security group names to security group ids
162 # to pass to create_port
163 if 'security_groups' in kwargs:
164 security_groups = \
165 clients.security_groups_client.list_security_groups(
166 ).get('security_groups')
167 sec_dict = dict([(s['name'], s['id'])
168 for s in security_groups])
169
170 sec_groups_names = [s['name'] for s in kwargs.pop(
171 'security_groups')]
172 security_groups_ids = [sec_dict[s]
173 for s in sec_groups_names]
174
175 if security_groups_ids:
176 create_port_body[
177 'security_groups'] = security_groups_ids
178 networks = kwargs.pop('networks', [])
179 else:
180 networks = []
181
182 # If there are no networks passed to us we look up
183 # for the project's private networks and create a port.
184 # The same behaviour as we would expect when passing
185 # the call to the clients with no networks
186 if not networks:
187 networks = clients.networks_client.list_networks(
188 **{'router:external': False, 'fields': 'id'})['networks']
189
190 # It's net['uuid'] if networks come from kwargs
191 # and net['id'] if they come from
192 # clients.networks_client.list_networks
193 for net in networks:
194 net_id = net.get('uuid', net.get('id'))
195 if 'port' not in net:
196 port = self._create_port(network_id=net_id,
197 client=clients.ports_client,
198 **create_port_body)
199 ports.append({'port': port['id']})
200 else:
201 ports.append({'port': net['port']})
202 if ports:
203 kwargs['networks'] = ports
204 self.ports = ports
205
206 tenant_network = self.get_tenant_network()
207
208 body, servers = compute.create_test_server(
209 clients,
210 tenant_network=tenant_network,
211 wait_until=wait_until,
212 name=name, flavor=flavor,
213 image_id=image_id, **kwargs)
214
215 self.addCleanup(waiters.wait_for_server_termination,
216 clients.servers_client, body['id'])
217 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
218 clients.servers_client.delete_server, body['id'])
219 server = clients.servers_client.show_server(body['id'])['server']
220 return server
221
222 def create_volume(self, size=None, name=None, snapshot_id=None,
223 imageRef=None, volume_type=None):
224 if size is None:
225 size = CONF.volume.volume_size
226 if imageRef:
227 image = self.compute_images_client.show_image(imageRef)['image']
228 min_disk = image.get('minDisk')
229 size = max(size, min_disk)
230 if name is None:
231 name = data_utils.rand_name(self.__class__.__name__ + "-volume")
232 kwargs = {'display_name': name,
233 'snapshot_id': snapshot_id,
234 'imageRef': imageRef,
235 'volume_type': volume_type,
236 'size': size}
237 volume = self.volumes_client.create_volume(**kwargs)['volume']
238
239 self.addCleanup(self.volumes_client.wait_for_resource_deletion,
240 volume['id'])
241 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
242 self.volumes_client.delete_volume, volume['id'])
243
244 # NOTE(e0ne): Cinder API v2 uses name instead of display_name
245 if 'display_name' in volume:
246 self.assertEqual(name, volume['display_name'])
247 else:
248 self.assertEqual(name, volume['name'])
249 waiters.wait_for_volume_resource_status(self.volumes_client,
250 volume['id'], 'available')
251 # The volume retrieved on creation has a non-up-to-date status.
252 # Retrieval after it becomes active ensures correct details.
253 volume = self.volumes_client.show_volume(volume['id'])['volume']
254 return volume
255
256 def create_volume_type(self, client=None, name=None, backend_name=None):
257 if not client:
258 client = self.admin_volume_types_client
259 if not name:
260 class_name = self.__class__.__name__
261 name = data_utils.rand_name(class_name + '-volume-type')
262 randomized_name = data_utils.rand_name('scenario-type-' + name)
263
264 LOG.debug("Creating a volume type: %s on backend %s",
265 randomized_name, backend_name)
266 extra_specs = {}
267 if backend_name:
268 extra_specs = {"volume_backend_name": backend_name}
269
270 body = client.create_volume_type(name=randomized_name,
271 extra_specs=extra_specs)
272 volume_type = body['volume_type']
273 self.assertIn('id', volume_type)
274 self.addCleanup(client.delete_volume_type, volume_type['id'])
275 return volume_type
276
277 def _create_loginable_secgroup_rule(self, secgroup_id=None):
278 _client = self.compute_security_groups_client
279 _client_rules = self.compute_security_group_rules_client
280 if secgroup_id is None:
281 sgs = _client.list_security_groups()['security_groups']
282 for sg in sgs:
283 if sg['name'] == 'default':
284 secgroup_id = sg['id']
285
286 # These rules are intended to permit inbound ssh and icmp
287 # traffic from all sources, so no group_id is provided.
288 # Setting a group_id would only permit traffic from ports
289 # belonging to the same security group.
290 rulesets = [
291 {
292 # ssh
293 'ip_protocol': 'tcp',
294 'from_port': 22,
295 'to_port': 22,
296 'cidr': '0.0.0.0/0',
297 },
298 {
299 # ping
300 'ip_protocol': 'icmp',
301 'from_port': -1,
302 'to_port': -1,
303 'cidr': '0.0.0.0/0',
304 }
305 ]
306 rules = list()
307 for ruleset in rulesets:
308 sg_rule = _client_rules.create_security_group_rule(
309 parent_group_id=secgroup_id, **ruleset)['security_group_rule']
310 rules.append(sg_rule)
311 return rules
312
313 def _create_security_group(self):
314 # Create security group
315 sg_name = data_utils.rand_name(self.__class__.__name__)
316 sg_desc = sg_name + " description"
317 secgroup = self.compute_security_groups_client.create_security_group(
318 name=sg_name, description=sg_desc)['security_group']
319 self.assertEqual(secgroup['name'], sg_name)
320 self.assertEqual(secgroup['description'], sg_desc)
321 self.addCleanup(
322 test_utils.call_and_ignore_notfound_exc,
323 self.compute_security_groups_client.delete_security_group,
324 secgroup['id'])
325
326 # Add rules to the security group
327 self._create_loginable_secgroup_rule(secgroup['id'])
328
329 return secgroup
330
331 def get_remote_client(self, ip_address, username=None, private_key=None):
332 """Get a SSH client to a remote server
333
334 @param ip_address the server floating or fixed IP address to use
335 for ssh validation
336 @param username name of the Linux account on the remote server
337 @param private_key the SSH private key to use
338 @return a RemoteClient object
339 """
340
341 if username is None:
342 username = CONF.validation.image_ssh_user
343 # Set this with 'keypair' or others to log in with keypair or
344 # username/password.
345 if CONF.validation.auth_method == 'keypair':
346 password = None
347 if private_key is None:
348 private_key = self.keypair['private_key']
349 else:
350 password = CONF.validation.image_ssh_password
351 private_key = None
352 linux_client = remote_client.RemoteClient(ip_address, username,
353 pkey=private_key,
354 password=password)
355 try:
356 linux_client.validate_authentication()
357 except Exception as e:
358 message = ('Initializing SSH connection to %(ip)s failed. '
359 'Error: %(error)s' % {'ip': ip_address,
360 'error': e})
361 caller = test_utils.find_test_caller()
362 if caller:
363 message = '(%s) %s' % (caller, message)
364 LOG.exception(message)
365 self._log_console_output()
366 raise
367
368 return linux_client
369
370 def _image_create(self, name, fmt, path,
371 disk_format=None, properties=None):
372 if properties is None:
373 properties = {}
374 name = data_utils.rand_name('%s-' % name)
375 params = {
376 'name': name,
377 'container_format': fmt,
378 'disk_format': disk_format or fmt,
379 }
380 if CONF.image_feature_enabled.api_v1:
381 params['is_public'] = 'False'
382 params['properties'] = properties
383 params = {'headers': common_image.image_meta_to_headers(**params)}
384 else:
385 params['visibility'] = 'private'
386 # Additional properties are flattened out in the v2 API.
387 params.update(properties)
388 body = self.image_client.create_image(**params)
389 image = body['image'] if 'image' in body else body
390 self.addCleanup(self.image_client.delete_image, image['id'])
391 self.assertEqual("queued", image['status'])
392 with open(path, 'rb') as image_file:
393 if CONF.image_feature_enabled.api_v1:
394 self.image_client.update_image(image['id'], data=image_file)
395 else:
396 self.image_client.store_image_file(image['id'], image_file)
397 return image['id']
398
399 def glance_image_create(self):
400 img_path = CONF.scenario.img_dir + "/" + CONF.scenario.img_file
401 aki_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.aki_img_file
402 ari_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ari_img_file
403 ami_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ami_img_file
404 img_container_format = CONF.scenario.img_container_format
405 img_disk_format = CONF.scenario.img_disk_format
406 img_properties = CONF.scenario.img_properties
407 LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
408 "properties: %s, ami: %s, ari: %s, aki: %s",
409 img_path, img_container_format, img_disk_format,
410 img_properties, ami_img_path, ari_img_path, aki_img_path)
411 try:
412 image = self._image_create('scenario-img',
413 img_container_format,
414 img_path,
415 disk_format=img_disk_format,
416 properties=img_properties)
417 except IOError:
418 LOG.debug("A qcow2 image was not found. Try to get a uec image.")
419 kernel = self._image_create('scenario-aki', 'aki', aki_img_path)
420 ramdisk = self._image_create('scenario-ari', 'ari', ari_img_path)
421 properties = {'kernel_id': kernel, 'ramdisk_id': ramdisk}
422 image = self._image_create('scenario-ami', 'ami',
423 path=ami_img_path,
424 properties=properties)
425 LOG.debug("image:%s", image)
426
427 return image
428
429 def _log_console_output(self, servers=None):
430 if not CONF.compute_feature_enabled.console_output:
431 LOG.debug('Console output not supported, cannot log')
432 return
433 if not servers:
434 servers = self.servers_client.list_servers()
435 servers = servers['servers']
436 for server in servers:
437 try:
438 console_output = self.servers_client.get_console_output(
439 server['id'])['output']
440 LOG.debug('Console output for %s\nbody=\n%s',
441 server['id'], console_output)
442 except lib_exc.NotFound:
443 LOG.debug("Server %s disappeared(deleted) while looking "
444 "for the console log", server['id'])
445
446 def _log_net_info(self, exc):
447 # network debug is called as part of ssh init
448 if not isinstance(exc, lib_exc.SSHTimeout):
449 LOG.debug('Network information on a devstack host')
450
451 def create_server_snapshot(self, server, name=None):
452 # Glance client
453 _image_client = self.image_client
454 # Compute client
455 _images_client = self.compute_images_client
456 if name is None:
457 name = data_utils.rand_name(self.__class__.__name__ + 'snapshot')
458 LOG.debug("Creating a snapshot image for server: %s", server['name'])
459 image = _images_client.create_image(server['id'], name=name)
460 image_id = image.response['location'].split('images/')[1]
461 waiters.wait_for_image_status(_image_client, image_id, 'active')
462
463 self.addCleanup(_image_client.wait_for_resource_deletion,
464 image_id)
465 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
466 _image_client.delete_image, image_id)
467
468 if CONF.image_feature_enabled.api_v1:
469 # In glance v1 the additional properties are stored in the headers.
470 resp = _image_client.check_image(image_id)
471 snapshot_image = common_image.get_image_meta_from_headers(resp)
472 image_props = snapshot_image.get('properties', {})
473 else:
474 # In glance v2 the additional properties are flattened.
475 snapshot_image = _image_client.show_image(image_id)
476 image_props = snapshot_image
477
478 bdm = image_props.get('block_device_mapping')
479 if bdm:
480 bdm = json.loads(bdm)
481 if bdm and 'snapshot_id' in bdm[0]:
482 snapshot_id = bdm[0]['snapshot_id']
483 self.addCleanup(
484 self.snapshots_client.wait_for_resource_deletion,
485 snapshot_id)
486 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
487 self.snapshots_client.delete_snapshot,
488 snapshot_id)
489 waiters.wait_for_volume_resource_status(self.snapshots_client,
490 snapshot_id,
491 'available')
492 image_name = snapshot_image['name']
493 self.assertEqual(name, image_name)
494 LOG.debug("Created snapshot image %s for server %s",
495 image_name, server['name'])
496 return snapshot_image
497
498 def nova_volume_attach(self, server, volume_to_attach):
499 volume = self.servers_client.attach_volume(
500 server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
501 % CONF.compute.volume_device_name)['volumeAttachment']
502 self.assertEqual(volume_to_attach['id'], volume['id'])
503 waiters.wait_for_volume_resource_status(self.volumes_client,
504 volume['id'], 'in-use')
505
506 # Return the updated volume after the attachment
507 return self.volumes_client.show_volume(volume['id'])['volume']
508
509 def nova_volume_detach(self, server, volume):
510 self.servers_client.detach_volume(server['id'], volume['id'])
511 waiters.wait_for_volume_resource_status(self.volumes_client,
512 volume['id'], 'available')
513
514 volume = self.volumes_client.show_volume(volume['id'])['volume']
515 self.assertEqual('available', volume['status'])
516
517 def rebuild_server(self, server_id, image=None,
518 preserve_ephemeral=False, wait=True,
519 rebuild_kwargs=None):
520 if image is None:
521 image = CONF.compute.image_ref
522
523 rebuild_kwargs = rebuild_kwargs or {}
524
525 LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
526 server_id, image, preserve_ephemeral)
527 self.servers_client.rebuild_server(
528 server_id=server_id, image_ref=image,
529 preserve_ephemeral=preserve_ephemeral,
530 **rebuild_kwargs)
531 if wait:
532 waiters.wait_for_server_status(self.servers_client,
533 server_id, 'ACTIVE')
534
535 def ping_ip_address(self, ip_address, should_succeed=True,
536 ping_timeout=None, mtu=None):
537 timeout = ping_timeout or CONF.validation.ping_timeout
538 cmd = ['ping', '-c1', '-w1']
539
540 if mtu:
541 cmd += [
542 # don't fragment
543 '-M', 'do',
544 # ping receives just the size of ICMP payload
545 '-s', str(net_utils.get_ping_payload_size(mtu, 4))
546 ]
547 cmd.append(ip_address)
548
549 def ping():
550 proc = subprocess.Popen(cmd,
551 stdout=subprocess.PIPE,
552 stderr=subprocess.PIPE)
553 proc.communicate()
554
555 return (proc.returncode == 0) == should_succeed
556
557 caller = test_utils.find_test_caller()
558 LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
559 ' expected result is %(should_succeed)s', {
560 'caller': caller, 'ip': ip_address, 'timeout': timeout,
561 'should_succeed':
562 'reachable' if should_succeed else 'unreachable'
563 })
564 result = test_utils.call_until_true(ping, timeout, 1)
565 LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
566 'ping result is %(result)s', {
567 'caller': caller, 'ip': ip_address, 'timeout': timeout,
568 'result': 'expected' if result else 'unexpected'
569 })
570 return result
571
572 def check_vm_connectivity(self, ip_address,
573 username=None,
574 private_key=None,
575 should_connect=True,
576 mtu=None):
577 """Check server connectivity
578
579 :param ip_address: server to test against
580 :param username: server's ssh username
581 :param private_key: server's ssh private key to be used
582 :param should_connect: True/False indicates positive/negative test
583 positive - attempt ping and ssh
584 negative - attempt ping and fail if succeed
585 :param mtu: network MTU to use for connectivity validation
586
587 :raises: AssertError if the result of the connectivity check does
588 not match the value of the should_connect param
589 """
590 if should_connect:
591 msg = "Timed out waiting for %s to become reachable" % ip_address
592 else:
593 msg = "ip address %s is reachable" % ip_address
594 self.assertTrue(self.ping_ip_address(ip_address,
595 should_succeed=should_connect,
596 mtu=mtu),
597 msg=msg)
598 if should_connect:
599 # no need to check ssh for negative connectivity
600 self.get_remote_client(ip_address, username, private_key)
601
602 def check_public_network_connectivity(self, ip_address, username,
603 private_key, should_connect=True,
604 msg=None, servers=None, mtu=None):
605 # The target login is assumed to have been configured for
606 # key-based authentication by cloud-init.
607 LOG.debug('checking network connections to IP %s with user: %s',
608 ip_address, username)
609 try:
610 self.check_vm_connectivity(ip_address,
611 username,
612 private_key,
613 should_connect=should_connect,
614 mtu=mtu)
615 except Exception:
616 ex_msg = 'Public network connectivity check failed'
617 if msg:
618 ex_msg += ": " + msg
619 LOG.exception(ex_msg)
620 self._log_console_output(servers)
621 raise
622
623 def create_floating_ip(self, thing, pool_name=None):
624 """Create a floating IP and associates to a server on Nova"""
625
626 if not pool_name:
627 pool_name = CONF.network.floating_network_name
628 floating_ip = (self.compute_floating_ips_client.
629 create_floating_ip(pool=pool_name)['floating_ip'])
630 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
631 self.compute_floating_ips_client.delete_floating_ip,
632 floating_ip['id'])
633 self.compute_floating_ips_client.associate_floating_ip_to_server(
634 floating_ip['ip'], thing['id'])
635 return floating_ip
636
637 def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
638 private_key=None):
639 ssh_client = self.get_remote_client(ip_address,
640 private_key=private_key)
641 if dev_name is not None:
642 ssh_client.make_fs(dev_name)
643 ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
644 mount_path))
645 cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
646 ssh_client.exec_command(cmd_timestamp)
647 timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
648 % mount_path)
649 if dev_name is not None:
650 ssh_client.exec_command('sudo umount %s' % mount_path)
651 return timestamp
652
653 def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
654 private_key=None):
655 ssh_client = self.get_remote_client(ip_address,
656 private_key=private_key)
657 if dev_name is not None:
658 ssh_client.mount(dev_name, mount_path)
659 timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
660 % mount_path)
661 if dev_name is not None:
662 ssh_client.exec_command('sudo umount %s' % mount_path)
663 return timestamp
664
665 def get_server_ip(self, server):
666 """Get the server fixed or floating IP.
667
668 Based on the configuration we're in, return a correct ip
669 address for validating that a guest is up.
670 """
671 if CONF.validation.connect_method == 'floating':
672 # The tests calling this method don't have a floating IP
673 # and can't make use of the validation resources. So the
674 # method is creating the floating IP there.
675 return self.create_floating_ip(server)['ip']
676 elif CONF.validation.connect_method == 'fixed':
677 # Determine the network name to look for based on config or creds
678 # provider network resources.
679 if CONF.validation.network_for_ssh:
680 addresses = server['addresses'][
681 CONF.validation.network_for_ssh]
682 else:
683 creds_provider = self._get_credentials_provider()
684 net_creds = creds_provider.get_primary_creds()
685 network = getattr(net_creds, 'network', None)
686 addresses = (server['addresses'][network['name']]
687 if network else [])
688 for address in addresses:
689 if (address['version'] == CONF.validation.ip_version_for_ssh
690 and address['OS-EXT-IPS:type'] == 'fixed'):
691 return address['addr']
692 raise exceptions.ServerUnreachable(server_id=server['id'])
693 else:
694 raise lib_exc.InvalidConfiguration()
695
696
697class NetworkScenarioTest(ScenarioTest):
698 """Base class for network scenario tests.
699
700 This class provide helpers for network scenario tests, using the neutron
701 API. Helpers from ancestor which use the nova network API are overridden
702 with the neutron API.
703
704 This Class also enforces using Neutron instead of novanetwork.
705 Subclassed tests will be skipped if Neutron is not enabled
706
707 """
708
709 credentials = ['primary', 'admin']
710
711 @classmethod
712 def skip_checks(cls):
713 super(NetworkScenarioTest, cls).skip_checks()
714 if not CONF.service_available.neutron:
715 raise cls.skipException('Neutron not available')
716
717 def _create_network(self, networks_client=None,
718 tenant_id=None,
719 namestart='network-smoke-',
720 port_security_enabled=True):
721 if not networks_client:
722 networks_client = self.networks_client
723 if not tenant_id:
724 tenant_id = networks_client.tenant_id
725 name = data_utils.rand_name(namestart)
726 network_kwargs = dict(name=name, tenant_id=tenant_id)
727 # Neutron disables port security by default so we have to check the
728 # config before trying to create the network with port_security_enabled
729 if CONF.network_feature_enabled.port_security:
730 network_kwargs['port_security_enabled'] = port_security_enabled
731 result = networks_client.create_network(**network_kwargs)
732 network = result['network']
733
734 self.assertEqual(network['name'], name)
735 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
736 networks_client.delete_network,
737 network['id'])
738 return network
739
740 def _create_subnet(self, network, subnets_client=None,
741 routers_client=None, namestart='subnet-smoke',
742 **kwargs):
743 """Create a subnet for the given network
744
745 within the cidr block configured for tenant networks.
746 """
747 if not subnets_client:
748 subnets_client = self.subnets_client
749 if not routers_client:
750 routers_client = self.routers_client
751
752 def cidr_in_use(cidr, tenant_id):
753 """Check cidr existence
754
755 :returns: True if subnet with cidr already exist in tenant
756 False else
757 """
758 cidr_in_use = self.admin_manager.subnets_client.list_subnets(
759 tenant_id=tenant_id, cidr=cidr)['subnets']
760 return len(cidr_in_use) != 0
761
762 ip_version = kwargs.pop('ip_version', 4)
763
764 if ip_version == 6:
765 tenant_cidr = netaddr.IPNetwork(
766 CONF.network.project_network_v6_cidr)
767 num_bits = CONF.network.project_network_v6_mask_bits
768 else:
769 tenant_cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
770 num_bits = CONF.network.project_network_mask_bits
771
772 result = None
773 str_cidr = None
774 # Repeatedly attempt subnet creation with sequential cidr
775 # blocks until an unallocated block is found.
776 for subnet_cidr in tenant_cidr.subnet(num_bits):
777 str_cidr = str(subnet_cidr)
778 if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
779 continue
780
781 subnet = dict(
782 name=data_utils.rand_name(namestart),
783 network_id=network['id'],
784 tenant_id=network['tenant_id'],
785 cidr=str_cidr,
786 ip_version=ip_version,
787 **kwargs
788 )
789 try:
790 result = subnets_client.create_subnet(**subnet)
791 break
792 except lib_exc.Conflict as e:
793 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
794 if not is_overlapping_cidr:
795 raise
796 self.assertIsNotNone(result, 'Unable to allocate tenant network')
797
798 subnet = result['subnet']
799 self.assertEqual(subnet['cidr'], str_cidr)
800
801 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
802 subnets_client.delete_subnet, subnet['id'])
803
804 return subnet
805
806 def _get_server_port_id_and_ip4(self, server, ip_addr=None):
807 ports = self.admin_manager.ports_client.list_ports(
808 device_id=server['id'], fixed_ip=ip_addr)['ports']
809 # A port can have more than one IP address in some cases.
810 # If the network is dual-stack (IPv4 + IPv6), this port is associated
811 # with 2 subnets
812 p_status = ['ACTIVE']
813 # NOTE(vsaienko) With Ironic, instances live on separate hardware
814 # servers. Neutron does not bind ports for Ironic instances, as a
815 # result the port remains in the DOWN state.
816 # TODO(vsaienko) remove once bug: #1599836 is resolved.
817 if getattr(CONF.service_available, 'ironic', False):
818 p_status.append('DOWN')
819 port_map = [(p["id"], fxip["ip_address"])
820 for p in ports
821 for fxip in p["fixed_ips"]
822 if netutils.is_valid_ipv4(fxip["ip_address"])
823 and p['status'] in p_status]
824 inactive = [p for p in ports if p['status'] != 'ACTIVE']
825 if inactive:
826 LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
827
828 self.assertNotEqual(0, len(port_map),
829 "No IPv4 addresses found in: %s" % ports)
830 self.assertEqual(len(port_map), 1,
831 "Found multiple IPv4 addresses: %s. "
832 "Unable to determine which port to target."
833 % port_map)
834 return port_map[0]
835
836 def _get_network_by_name(self, network_name):
837 net = self.admin_manager.networks_client.list_networks(
838 name=network_name)['networks']
839 self.assertNotEqual(len(net), 0,
840 "Unable to get network by name: %s" % network_name)
841 return net[0]
842
843 def create_floating_ip(self, thing, external_network_id=None,
844 port_id=None, client=None):
845 """Create a floating IP and associates to a resource/port on Neutron"""
846 if not external_network_id:
847 external_network_id = CONF.network.public_network_id
848 if not client:
849 client = self.floating_ips_client
850 if not port_id:
851 port_id, ip4 = self._get_server_port_id_and_ip4(thing)
852 else:
853 ip4 = None
854 result = client.create_floatingip(
855 floating_network_id=external_network_id,
856 port_id=port_id,
857 tenant_id=thing['tenant_id'],
858 fixed_ip_address=ip4
859 )
860 floating_ip = result['floatingip']
861 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
862 client.delete_floatingip,
863 floating_ip['id'])
864 return floating_ip
865
866 def _associate_floating_ip(self, floating_ip, server):
867 port_id, _ = self._get_server_port_id_and_ip4(server)
868 kwargs = dict(port_id=port_id)
869 floating_ip = self.floating_ips_client.update_floatingip(
870 floating_ip['id'], **kwargs)['floatingip']
871 self.assertEqual(port_id, floating_ip['port_id'])
872 return floating_ip
873
874 def _disassociate_floating_ip(self, floating_ip):
875 """:param floating_ip: floating_ips_client.create_floatingip"""
876 kwargs = dict(port_id=None)
877 floating_ip = self.floating_ips_client.update_floatingip(
878 floating_ip['id'], **kwargs)['floatingip']
879 self.assertIsNone(floating_ip['port_id'])
880 return floating_ip
881
882 def check_floating_ip_status(self, floating_ip, status):
883 """Verifies floatingip reaches the given status
884
885 :param dict floating_ip: floating IP dict to check status
886 :param status: target status
887 :raises: AssertionError if status doesn't match
888 """
889 floatingip_id = floating_ip['id']
890
891 def refresh():
892 result = (self.floating_ips_client.
893 show_floatingip(floatingip_id)['floatingip'])
894 return status == result['status']
895
896 test_utils.call_until_true(refresh,
897 CONF.network.build_timeout,
898 CONF.network.build_interval)
899 floating_ip = self.floating_ips_client.show_floatingip(
900 floatingip_id)['floatingip']
901 self.assertEqual(status, floating_ip['status'],
902 message="FloatingIP: {fp} is at status: {cst}. "
903 "failed to reach status: {st}"
904 .format(fp=floating_ip, cst=floating_ip['status'],
905 st=status))
906 LOG.info("FloatingIP: {fp} is at status: {st}"
907 .format(fp=floating_ip, st=status))
908
909 def _check_tenant_network_connectivity(self, server,
910 username,
911 private_key,
912 should_connect=True,
913 servers_for_debug=None):
914 if not CONF.network.project_networks_reachable:
915 msg = 'Tenant networks not configured to be reachable.'
916 LOG.info(msg)
917 return
918 # The target login is assumed to have been configured for
919 # key-based authentication by cloud-init.
920 try:
921 for ip_addresses in server['addresses'].values():
922 for ip_address in ip_addresses:
923 self.check_vm_connectivity(ip_address['addr'],
924 username,
925 private_key,
926 should_connect=should_connect)
927 except Exception as e:
928 LOG.exception('Tenant network connectivity check failed')
929 self._log_console_output(servers_for_debug)
930 self._log_net_info(e)
931 raise
932
933 def _check_remote_connectivity(self, source, dest, should_succeed=True,
934 nic=None):
935 """assert ping server via source ssh connection
936
937 Note: This is an internal method. Use check_remote_connectivity
938 instead.
939
940 :param source: RemoteClient: an ssh connection from which to ping
941 :param dest: and IP to ping against
942 :param should_succeed: boolean should ping succeed or not
943 :param nic: specific network interface to ping from
944 """
945 def ping_remote():
946 try:
947 source.ping_host(dest, nic=nic)
948 except lib_exc.SSHExecCommandFailed:
949 LOG.warning('Failed to ping IP: %s via a ssh connection '
950 'from: %s.', dest, source.ssh_client.host)
951 return not should_succeed
952 return should_succeed
953
954 return test_utils.call_until_true(ping_remote,
955 CONF.validation.ping_timeout,
956 1)
957
958 def check_remote_connectivity(self, source, dest, should_succeed=True,
959 nic=None):
960 """assert ping server via source ssh connection
961
962 :param source: RemoteClient: an ssh connection from which to ping
963 :param dest: and IP to ping against
964 :param should_succeed: boolean should ping succeed or not
965 :param nic: specific network interface to ping from
966 """
967 result = self._check_remote_connectivity(source, dest, should_succeed,
968 nic)
969 source_host = source.ssh_client.host
970 if should_succeed:
971 msg = "Timed out waiting for %s to become reachable from %s" \
972 % (dest, source_host)
973 else:
974 msg = "%s is reachable from %s" % (dest, source_host)
975 self.assertTrue(result, msg)
976
977 def _create_security_group(self, security_group_rules_client=None,
978 tenant_id=None,
979 namestart='secgroup-smoke',
980 security_groups_client=None):
981 if security_group_rules_client is None:
982 security_group_rules_client = self.security_group_rules_client
983 if security_groups_client is None:
984 security_groups_client = self.security_groups_client
985 if tenant_id is None:
986 tenant_id = security_groups_client.tenant_id
987 secgroup = self._create_empty_security_group(
988 namestart=namestart, client=security_groups_client,
989 tenant_id=tenant_id)
990
991 # Add rules to the security group
992 rules = self._create_loginable_secgroup_rule(
993 security_group_rules_client=security_group_rules_client,
994 secgroup=secgroup,
995 security_groups_client=security_groups_client)
996 for rule in rules:
997 self.assertEqual(tenant_id, rule['tenant_id'])
998 self.assertEqual(secgroup['id'], rule['security_group_id'])
999 return secgroup
1000
1001 def _create_empty_security_group(self, client=None, tenant_id=None,
1002 namestart='secgroup-smoke'):
1003 """Create a security group without rules.
1004
1005 Default rules will be created:
1006 - IPv4 egress to any
1007 - IPv6 egress to any
1008
1009 :param tenant_id: secgroup will be created in this tenant
1010 :returns: the created security group
1011 """
1012 if client is None:
1013 client = self.security_groups_client
1014 if not tenant_id:
1015 tenant_id = client.tenant_id
1016 sg_name = data_utils.rand_name(namestart)
1017 sg_desc = sg_name + " description"
1018 sg_dict = dict(name=sg_name,
1019 description=sg_desc)
1020 sg_dict['tenant_id'] = tenant_id
1021 result = client.create_security_group(**sg_dict)
1022
1023 secgroup = result['security_group']
1024 self.assertEqual(secgroup['name'], sg_name)
1025 self.assertEqual(tenant_id, secgroup['tenant_id'])
1026 self.assertEqual(secgroup['description'], sg_desc)
1027
1028 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1029 client.delete_security_group, secgroup['id'])
1030 return secgroup
1031
1032 def _default_security_group(self, client=None, tenant_id=None):
1033 """Get default secgroup for given tenant_id.
1034
1035 :returns: default secgroup for given tenant
1036 """
1037 if client is None:
1038 client = self.security_groups_client
1039 if not tenant_id:
1040 tenant_id = client.tenant_id
1041 sgs = [
1042 sg for sg in list(client.list_security_groups().values())[0]
1043 if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
1044 ]
1045 msg = "No default security group for tenant %s." % (tenant_id)
1046 self.assertGreater(len(sgs), 0, msg)
1047 return sgs[0]
1048
1049 def _create_security_group_rule(self, secgroup=None,
1050 sec_group_rules_client=None,
1051 tenant_id=None,
1052 security_groups_client=None, **kwargs):
1053 """Create a rule from a dictionary of rule parameters.
1054
1055 Create a rule in a secgroup. if secgroup not defined will search for
1056 default secgroup in tenant_id.
1057
1058 :param secgroup: the security group.
1059 :param tenant_id: if secgroup not passed -- the tenant in which to
1060 search for default secgroup
1061 :param kwargs: a dictionary containing rule parameters:
1062 for example, to allow incoming ssh:
1063 rule = {
1064 direction: 'ingress'
1065 protocol:'tcp',
1066 port_range_min: 22,
1067 port_range_max: 22
1068 }
1069 """
1070 if sec_group_rules_client is None:
1071 sec_group_rules_client = self.security_group_rules_client
1072 if security_groups_client is None:
1073 security_groups_client = self.security_groups_client
1074 if not tenant_id:
1075 tenant_id = security_groups_client.tenant_id
1076 if secgroup is None:
1077 secgroup = self._default_security_group(
1078 client=security_groups_client, tenant_id=tenant_id)
1079
1080 ruleset = dict(security_group_id=secgroup['id'],
1081 tenant_id=secgroup['tenant_id'])
1082 ruleset.update(kwargs)
1083
1084 sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
1085 sg_rule = sg_rule['security_group_rule']
1086
1087 self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
1088 self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
1089
1090 return sg_rule
1091
1092 def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
1093 secgroup=None,
1094 security_groups_client=None):
1095 """Create loginable security group rule
1096
1097 This function will create:
1098 1. egress and ingress tcp port 22 allow rule in order to allow ssh
1099 access for ipv4.
1100 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
1101 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
1102 """
1103
1104 if security_group_rules_client is None:
1105 security_group_rules_client = self.security_group_rules_client
1106 if security_groups_client is None:
1107 security_groups_client = self.security_groups_client
1108 rules = []
1109 rulesets = [
1110 dict(
1111 # ssh
1112 protocol='tcp',
1113 port_range_min=22,
1114 port_range_max=22,
1115 ),
1116 dict(
1117 # ping
1118 protocol='icmp',
1119 ),
1120 dict(
1121 # ipv6-icmp for ping6
1122 protocol='icmp',
1123 ethertype='IPv6',
1124 )
1125 ]
1126 sec_group_rules_client = security_group_rules_client
1127 for ruleset in rulesets:
1128 for r_direction in ['ingress', 'egress']:
1129 ruleset['direction'] = r_direction
1130 try:
1131 sg_rule = self._create_security_group_rule(
1132 sec_group_rules_client=sec_group_rules_client,
1133 secgroup=secgroup,
1134 security_groups_client=security_groups_client,
1135 **ruleset)
1136 except lib_exc.Conflict as ex:
1137 # if rule already exist - skip rule and continue
1138 msg = 'Security group rule already exists'
1139 if msg not in ex._error_string:
1140 raise ex
1141 else:
1142 self.assertEqual(r_direction, sg_rule['direction'])
1143 rules.append(sg_rule)
1144
1145 return rules
1146
1147 def _get_router(self, client=None, tenant_id=None):
1148 """Retrieve a router for the given tenant id.
1149
1150 If a public router has been configured, it will be returned.
1151
1152 If a public router has not been configured, but a public
1153 network has, a tenant router will be created and returned that
1154 routes traffic to the public network.
1155 """
1156 if not client:
1157 client = self.routers_client
1158 if not tenant_id:
1159 tenant_id = client.tenant_id
1160 router_id = CONF.network.public_router_id
1161 network_id = CONF.network.public_network_id
1162 if router_id:
1163 body = client.show_router(router_id)
1164 return body['router']
1165 elif network_id:
1166 router = self._create_router(client, tenant_id)
1167 kwargs = {'external_gateway_info': dict(network_id=network_id)}
1168 router = client.update_router(router['id'], **kwargs)['router']
1169 return router
1170 else:
1171 raise Exception("Neither of 'public_router_id' or "
1172 "'public_network_id' has been defined.")
1173
1174 def _create_router(self, client=None, tenant_id=None,
1175 namestart='router-smoke'):
1176 if not client:
1177 client = self.routers_client
1178 if not tenant_id:
1179 tenant_id = client.tenant_id
1180 name = data_utils.rand_name(namestart)
1181 result = client.create_router(name=name,
1182 admin_state_up=True,
1183 tenant_id=tenant_id)
1184 router = result['router']
1185 self.assertEqual(router['name'], name)
1186 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1187 client.delete_router,
1188 router['id'])
1189 return router
1190
1191 def _update_router_admin_state(self, router, admin_state_up):
1192 kwargs = dict(admin_state_up=admin_state_up)
1193 router = self.routers_client.update_router(
1194 router['id'], **kwargs)['router']
1195 self.assertEqual(admin_state_up, router['admin_state_up'])
1196
1197 def create_networks(self, networks_client=None,
1198 routers_client=None, subnets_client=None,
1199 tenant_id=None, dns_nameservers=None,
1200 port_security_enabled=True):
1201 """Create a network with a subnet connected to a router.
1202
1203 The baremetal driver is a special case since all nodes are
1204 on the same shared network.
1205
1206 :param tenant_id: id of tenant to create resources in.
1207 :param dns_nameservers: list of dns servers to send to subnet.
1208 :returns: network, subnet, router
1209 """
1210 if CONF.network.shared_physical_network:
1211 # NOTE(Shrews): This exception is for environments where tenant
1212 # credential isolation is available, but network separation is
1213 # not (the current baremetal case). Likely can be removed when
1214 # test account mgmt is reworked:
1215 # https://blueprints.launchpad.net/tempest/+spec/test-accounts
1216 if not CONF.compute.fixed_network_name:
1217 m = 'fixed_network_name must be specified in config'
1218 raise lib_exc.InvalidConfiguration(m)
1219 network = self._get_network_by_name(
1220 CONF.compute.fixed_network_name)
1221 router = None
1222 subnet = None
1223 else:
1224 network = self._create_network(
1225 networks_client=networks_client,
1226 tenant_id=tenant_id,
1227 port_security_enabled=port_security_enabled)
1228 router = self._get_router(client=routers_client,
1229 tenant_id=tenant_id)
1230 subnet_kwargs = dict(network=network,
1231 subnets_client=subnets_client,
1232 routers_client=routers_client)
1233 # use explicit check because empty list is a valid option
1234 if dns_nameservers is not None:
1235 subnet_kwargs['dns_nameservers'] = dns_nameservers
1236 subnet = self._create_subnet(**subnet_kwargs)
1237 if not routers_client:
1238 routers_client = self.routers_client
1239 router_id = router['id']
1240 routers_client.add_router_interface(router_id,
1241 subnet_id=subnet['id'])
1242
1243 # save a cleanup job to remove this association between
1244 # router and subnet
1245 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1246 routers_client.remove_router_interface, router_id,
1247 subnet_id=subnet['id'])
1248 return network, subnet, router
1249
1250
1251class EncryptionScenarioTest(ScenarioTest):
1252 """Base class for encryption scenario tests"""
1253
1254 credentials = ['primary', 'admin']
1255
1256 @classmethod
1257 def setup_clients(cls):
1258 super(EncryptionScenarioTest, cls).setup_clients()
1259 if CONF.volume_feature_enabled.api_v2:
1260 cls.admin_volume_types_client = cls.os_adm.volume_types_v2_client
1261 cls.admin_encryption_types_client =\
1262 cls.os_adm.encryption_types_v2_client
1263 else:
1264 cls.admin_volume_types_client = cls.os_adm.volume_types_client
1265 cls.admin_encryption_types_client =\
1266 cls.os_adm.encryption_types_client
1267
1268 def create_encryption_type(self, client=None, type_id=None, provider=None,
1269 key_size=None, cipher=None,
1270 control_location=None):
1271 if not client:
1272 client = self.admin_encryption_types_client
1273 if not type_id:
1274 volume_type = self.create_volume_type()
1275 type_id = volume_type['id']
1276 LOG.debug("Creating an encryption type for volume type: %s", type_id)
1277 client.create_encryption_type(
1278 type_id, provider=provider, key_size=key_size, cipher=cipher,
1279 control_location=control_location)['encryption']
1280
1281
1282class ObjectStorageScenarioTest(ScenarioTest):
1283 """Provide harness to do Object Storage scenario tests.
1284
1285 Subclasses implement the tests that use the methods provided by this
1286 class.
1287 """
1288
1289 @classmethod
1290 def skip_checks(cls):
1291 super(ObjectStorageScenarioTest, cls).skip_checks()
1292 if not CONF.service_available.swift:
1293 skip_msg = ("%s skipped as swift is not available" %
1294 cls.__name__)
1295 raise cls.skipException(skip_msg)
1296
1297 @classmethod
1298 def setup_credentials(cls):
1299 cls.set_network_resources()
1300 super(ObjectStorageScenarioTest, cls).setup_credentials()
1301 operator_role = CONF.object_storage.operator_role
1302 cls.os_operator = cls.get_client_manager(roles=[operator_role])
1303
1304 @classmethod
1305 def setup_clients(cls):
1306 super(ObjectStorageScenarioTest, cls).setup_clients()
1307 # Clients for Swift
1308 cls.account_client = cls.os_operator.account_client
1309 cls.container_client = cls.os_operator.container_client
1310 cls.object_client = cls.os_operator.object_client
1311
1312 def get_swift_stat(self):
1313 """get swift status for our user account."""
1314 self.account_client.list_account_containers()
1315 LOG.debug('Swift status information obtained successfully')
1316
1317 def create_container(self, container_name=None):
1318 name = container_name or data_utils.rand_name(
1319 'swift-scenario-container')
1320 self.container_client.create_container(name)
1321 # look for the container to assure it is created
1322 self.list_and_check_container_objects(name)
1323 LOG.debug('Container %s created', name)
1324 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1325 self.container_client.delete_container,
1326 name)
1327 return name
1328
1329 def delete_container(self, container_name):
1330 self.container_client.delete_container(container_name)
1331 LOG.debug('Container %s deleted', container_name)
1332
1333 def upload_object_to_container(self, container_name, obj_name=None):
1334 obj_name = obj_name or data_utils.rand_name('swift-scenario-object')
1335 obj_data = data_utils.random_bytes()
1336 self.object_client.create_object(container_name, obj_name, obj_data)
1337 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1338 self.object_client.delete_object,
1339 container_name,
1340 obj_name)
1341 return obj_name, obj_data
1342
1343 def delete_object(self, container_name, filename):
1344 self.object_client.delete_object(container_name, filename)
1345 self.list_and_check_container_objects(container_name,
1346 not_present_obj=[filename])
1347
1348 def list_and_check_container_objects(self, container_name,
1349 present_obj=None,
1350 not_present_obj=None):
1351 # List objects for a given container and assert which are present and
1352 # which are not.
1353 if present_obj is None:
1354 present_obj = []
1355 if not_present_obj is None:
1356 not_present_obj = []
1357 _, object_list = self.container_client.list_container_contents(
1358 container_name)
1359 if present_obj:
1360 for obj in present_obj:
1361 self.assertIn(obj, object_list)
1362 if not_present_obj:
1363 for obj in not_present_obj:
1364 self.assertNotIn(obj, object_list)
1365
1366 def change_container_acl(self, container_name, acl):
1367 metadata_param = {'metadata_prefix': 'x-container-',
1368 'metadata': {'read': acl}}
1369 self.container_client.update_container_metadata(container_name,
1370 **metadata_param)
1371 resp, _ = self.container_client.list_container_metadata(container_name)
1372 self.assertEqual(resp['x-container-read'], acl)
1373
1374 def download_and_verify(self, container_name, obj_name, expected_data):
1375 _, obj = self.object_client.get_object(container_name, obj_name)
1376 self.assertEqual(obj, expected_data)