blob: e58031b5843bceb6f4bdc4120eecc3e6e5ecb31e [file] [log] [blame]
Mehdi Abaakouk25812742017-03-01 07:44:41 +01001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
17import subprocess
18
19import netaddr
20from oslo_log import log
21from oslo_serialization import jsonutils as json
22from oslo_utils import netutils
23
24from tempest.common import compute
25from tempest.common import image as common_image
26from tempest.common.utils import data_utils
27from tempest.common.utils.linux import remote_client
28from tempest.common.utils import net_utils
29from tempest.common import waiters
30from tempest import config
31from tempest import exceptions
32from tempest.lib.common.utils import test_utils
33from tempest.lib import exceptions as lib_exc
34import tempest.test
35
36CONF = config.CONF
37
38LOG = log.getLogger(__name__)
39
40
41class ScenarioTest(tempest.test.BaseTestCase):
42 """Base class for scenario tests. Uses tempest own clients. """
43
44 credentials = ['primary']
45
46 @classmethod
47 def setup_clients(cls):
48 super(ScenarioTest, cls).setup_clients()
49 # Clients (in alphabetical order)
50 cls.flavors_client = cls.manager.flavors_client
51 cls.compute_floating_ips_client = (
52 cls.manager.compute_floating_ips_client)
53 if CONF.service_available.glance:
54 # Check if glance v1 is available to determine which client to use.
55 if CONF.image_feature_enabled.api_v1:
56 cls.image_client = cls.manager.image_client
57 elif CONF.image_feature_enabled.api_v2:
58 cls.image_client = cls.manager.image_client_v2
59 else:
60 raise lib_exc.InvalidConfiguration(
61 'Either api_v1 or api_v2 must be True in '
62 '[image-feature-enabled].')
63 # Compute image client
64 cls.compute_images_client = cls.manager.compute_images_client
65 cls.keypairs_client = cls.manager.keypairs_client
66 # Nova security groups client
67 cls.compute_security_groups_client = (
68 cls.manager.compute_security_groups_client)
69 cls.compute_security_group_rules_client = (
70 cls.manager.compute_security_group_rules_client)
71 cls.servers_client = cls.manager.servers_client
72 cls.interface_client = cls.manager.interfaces_client
73 # Neutron network client
74 cls.networks_client = cls.manager.networks_client
75 cls.ports_client = cls.manager.ports_client
76 cls.routers_client = cls.manager.routers_client
77 cls.subnets_client = cls.manager.subnets_client
78 cls.floating_ips_client = cls.manager.floating_ips_client
79 cls.security_groups_client = cls.manager.security_groups_client
80 cls.security_group_rules_client = (
81 cls.manager.security_group_rules_client)
82
83 if CONF.volume_feature_enabled.api_v2:
84 cls.volumes_client = cls.manager.volumes_v2_client
85 cls.snapshots_client = cls.manager.snapshots_v2_client
86 else:
87 cls.volumes_client = cls.manager.volumes_client
88 cls.snapshots_client = cls.manager.snapshots_client
89
90 # ## Test functions library
91 #
92 # The create_[resource] functions only return body and discard the
93 # resp part which is not used in scenario tests
94
95 def _create_port(self, network_id, client=None, namestart='port-quotatest',
96 **kwargs):
97 if not client:
98 client = self.ports_client
99 name = data_utils.rand_name(namestart)
100 result = client.create_port(
101 name=name,
102 network_id=network_id,
103 **kwargs)
104 self.assertIsNotNone(result, 'Unable to allocate port')
105 port = result['port']
106 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
107 client.delete_port, port['id'])
108 return port
109
110 def create_keypair(self, client=None):
111 if not client:
112 client = self.keypairs_client
113 name = data_utils.rand_name(self.__class__.__name__)
114 # We don't need to create a keypair by pubkey in scenario
115 body = client.create_keypair(name=name)
116 self.addCleanup(client.delete_keypair, name)
117 return body['keypair']
118
119 def create_server(self, name=None, image_id=None, flavor=None,
120 validatable=False, wait_until='ACTIVE',
121 clients=None, **kwargs):
122 """Wrapper utility that returns a test server.
123
124 This wrapper utility calls the common create test server and
125 returns a test server. The purpose of this wrapper is to minimize
126 the impact on the code of the tests already using this
127 function.
128 """
129
130 # NOTE(jlanoux): As a first step, ssh checks in the scenario
131 # tests need to be run regardless of the run_validation and
132 # validatable parameters and thus until the ssh validation job
133 # becomes voting in CI. The test resources management and IP
134 # association are taken care of in the scenario tests.
135 # Therefore, the validatable parameter is set to false in all
136 # those tests. In this way create_server just return a standard
137 # server and the scenario tests always perform ssh checks.
138
139 # Needed for the cross_tenant_traffic test:
140 if clients is None:
141 clients = self.manager
142
143 if name is None:
144 name = data_utils.rand_name(self.__class__.__name__ + "-server")
145
146 vnic_type = CONF.network.port_vnic_type
147
148 # If vnic_type is configured create port for
149 # every network
150 if vnic_type:
151 ports = []
152
153 create_port_body = {'binding:vnic_type': vnic_type,
154 'namestart': 'port-smoke'}
155 if kwargs:
156 # Convert security group names to security group ids
157 # to pass to create_port
158 if 'security_groups' in kwargs:
159 security_groups = \
160 clients.security_groups_client.list_security_groups(
161 ).get('security_groups')
162 sec_dict = dict([(s['name'], s['id'])
163 for s in security_groups])
164
165 sec_groups_names = [s['name'] for s in kwargs.pop(
166 'security_groups')]
167 security_groups_ids = [sec_dict[s]
168 for s in sec_groups_names]
169
170 if security_groups_ids:
171 create_port_body[
172 'security_groups'] = security_groups_ids
173 networks = kwargs.pop('networks', [])
174 else:
175 networks = []
176
177 # If there are no networks passed to us we look up
178 # for the project's private networks and create a port.
179 # The same behaviour as we would expect when passing
180 # the call to the clients with no networks
181 if not networks:
182 networks = clients.networks_client.list_networks(
183 **{'router:external': False, 'fields': 'id'})['networks']
184
185 # It's net['uuid'] if networks come from kwargs
186 # and net['id'] if they come from
187 # clients.networks_client.list_networks
188 for net in networks:
189 net_id = net.get('uuid', net.get('id'))
190 if 'port' not in net:
191 port = self._create_port(network_id=net_id,
192 client=clients.ports_client,
193 **create_port_body)
194 ports.append({'port': port['id']})
195 else:
196 ports.append({'port': net['port']})
197 if ports:
198 kwargs['networks'] = ports
199 self.ports = ports
200
201 tenant_network = self.get_tenant_network()
202
203 body, servers = compute.create_test_server(
204 clients,
205 tenant_network=tenant_network,
206 wait_until=wait_until,
207 name=name, flavor=flavor,
208 image_id=image_id, **kwargs)
209
210 self.addCleanup(waiters.wait_for_server_termination,
211 clients.servers_client, body['id'])
212 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
213 clients.servers_client.delete_server, body['id'])
214 server = clients.servers_client.show_server(body['id'])['server']
215 return server
216
217 def create_volume(self, size=None, name=None, snapshot_id=None,
218 imageRef=None, volume_type=None):
219 if size is None:
220 size = CONF.volume.volume_size
221 if imageRef:
222 image = self.compute_images_client.show_image(imageRef)['image']
223 min_disk = image.get('minDisk')
224 size = max(size, min_disk)
225 if name is None:
226 name = data_utils.rand_name(self.__class__.__name__ + "-volume")
227 kwargs = {'display_name': name,
228 'snapshot_id': snapshot_id,
229 'imageRef': imageRef,
230 'volume_type': volume_type,
231 'size': size}
232 volume = self.volumes_client.create_volume(**kwargs)['volume']
233
234 self.addCleanup(self.volumes_client.wait_for_resource_deletion,
235 volume['id'])
236 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
237 self.volumes_client.delete_volume, volume['id'])
238
239 # NOTE(e0ne): Cinder API v2 uses name instead of display_name
240 if 'display_name' in volume:
241 self.assertEqual(name, volume['display_name'])
242 else:
243 self.assertEqual(name, volume['name'])
244 waiters.wait_for_volume_resource_status(self.volumes_client,
245 volume['id'], 'available')
246 # The volume retrieved on creation has a non-up-to-date status.
247 # Retrieval after it becomes active ensures correct details.
248 volume = self.volumes_client.show_volume(volume['id'])['volume']
249 return volume
250
251 def create_volume_type(self, client=None, name=None, backend_name=None):
252 if not client:
253 client = self.admin_volume_types_client
254 if not name:
255 class_name = self.__class__.__name__
256 name = data_utils.rand_name(class_name + '-volume-type')
257 randomized_name = data_utils.rand_name('scenario-type-' + name)
258
259 LOG.debug("Creating a volume type: %s on backend %s",
260 randomized_name, backend_name)
261 extra_specs = {}
262 if backend_name:
263 extra_specs = {"volume_backend_name": backend_name}
264
265 body = client.create_volume_type(name=randomized_name,
266 extra_specs=extra_specs)
267 volume_type = body['volume_type']
268 self.assertIn('id', volume_type)
269 self.addCleanup(client.delete_volume_type, volume_type['id'])
270 return volume_type
271
272 def _create_loginable_secgroup_rule(self, secgroup_id=None):
273 _client = self.compute_security_groups_client
274 _client_rules = self.compute_security_group_rules_client
275 if secgroup_id is None:
276 sgs = _client.list_security_groups()['security_groups']
277 for sg in sgs:
278 if sg['name'] == 'default':
279 secgroup_id = sg['id']
280
281 # These rules are intended to permit inbound ssh and icmp
282 # traffic from all sources, so no group_id is provided.
283 # Setting a group_id would only permit traffic from ports
284 # belonging to the same security group.
285 rulesets = [
286 {
287 # ssh
288 'ip_protocol': 'tcp',
289 'from_port': 22,
290 'to_port': 22,
291 'cidr': '0.0.0.0/0',
292 },
293 {
294 # ping
295 'ip_protocol': 'icmp',
296 'from_port': -1,
297 'to_port': -1,
298 'cidr': '0.0.0.0/0',
299 }
300 ]
301 rules = list()
302 for ruleset in rulesets:
303 sg_rule = _client_rules.create_security_group_rule(
304 parent_group_id=secgroup_id, **ruleset)['security_group_rule']
305 rules.append(sg_rule)
306 return rules
307
308 def _create_security_group(self):
309 # Create security group
310 sg_name = data_utils.rand_name(self.__class__.__name__)
311 sg_desc = sg_name + " description"
312 secgroup = self.compute_security_groups_client.create_security_group(
313 name=sg_name, description=sg_desc)['security_group']
314 self.assertEqual(secgroup['name'], sg_name)
315 self.assertEqual(secgroup['description'], sg_desc)
316 self.addCleanup(
317 test_utils.call_and_ignore_notfound_exc,
318 self.compute_security_groups_client.delete_security_group,
319 secgroup['id'])
320
321 # Add rules to the security group
322 self._create_loginable_secgroup_rule(secgroup['id'])
323
324 return secgroup
325
326 def get_remote_client(self, ip_address, username=None, private_key=None):
327 """Get a SSH client to a remote server
328
329 @param ip_address the server floating or fixed IP address to use
330 for ssh validation
331 @param username name of the Linux account on the remote server
332 @param private_key the SSH private key to use
333 @return a RemoteClient object
334 """
335
336 if username is None:
337 username = CONF.validation.image_ssh_user
338 # Set this with 'keypair' or others to log in with keypair or
339 # username/password.
340 if CONF.validation.auth_method == 'keypair':
341 password = None
342 if private_key is None:
343 private_key = self.keypair['private_key']
344 else:
345 password = CONF.validation.image_ssh_password
346 private_key = None
347 linux_client = remote_client.RemoteClient(ip_address, username,
348 pkey=private_key,
349 password=password)
350 try:
351 linux_client.validate_authentication()
352 except Exception as e:
353 message = ('Initializing SSH connection to %(ip)s failed. '
354 'Error: %(error)s' % {'ip': ip_address,
355 'error': e})
356 caller = test_utils.find_test_caller()
357 if caller:
358 message = '(%s) %s' % (caller, message)
359 LOG.exception(message)
360 self._log_console_output()
361 raise
362
363 return linux_client
364
365 def _image_create(self, name, fmt, path,
366 disk_format=None, properties=None):
367 if properties is None:
368 properties = {}
369 name = data_utils.rand_name('%s-' % name)
370 params = {
371 'name': name,
372 'container_format': fmt,
373 'disk_format': disk_format or fmt,
374 }
375 if CONF.image_feature_enabled.api_v1:
376 params['is_public'] = 'False'
377 params['properties'] = properties
378 params = {'headers': common_image.image_meta_to_headers(**params)}
379 else:
380 params['visibility'] = 'private'
381 # Additional properties are flattened out in the v2 API.
382 params.update(properties)
383 body = self.image_client.create_image(**params)
384 image = body['image'] if 'image' in body else body
385 self.addCleanup(self.image_client.delete_image, image['id'])
386 self.assertEqual("queued", image['status'])
387 with open(path, 'rb') as image_file:
388 if CONF.image_feature_enabled.api_v1:
389 self.image_client.update_image(image['id'], data=image_file)
390 else:
391 self.image_client.store_image_file(image['id'], image_file)
392 return image['id']
393
394 def glance_image_create(self):
395 img_path = CONF.scenario.img_dir + "/" + CONF.scenario.img_file
396 aki_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.aki_img_file
397 ari_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ari_img_file
398 ami_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ami_img_file
399 img_container_format = CONF.scenario.img_container_format
400 img_disk_format = CONF.scenario.img_disk_format
401 img_properties = CONF.scenario.img_properties
402 LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
403 "properties: %s, ami: %s, ari: %s, aki: %s",
404 img_path, img_container_format, img_disk_format,
405 img_properties, ami_img_path, ari_img_path, aki_img_path)
406 try:
407 image = self._image_create('scenario-img',
408 img_container_format,
409 img_path,
410 disk_format=img_disk_format,
411 properties=img_properties)
412 except IOError:
413 LOG.debug("A qcow2 image was not found. Try to get a uec image.")
414 kernel = self._image_create('scenario-aki', 'aki', aki_img_path)
415 ramdisk = self._image_create('scenario-ari', 'ari', ari_img_path)
416 properties = {'kernel_id': kernel, 'ramdisk_id': ramdisk}
417 image = self._image_create('scenario-ami', 'ami',
418 path=ami_img_path,
419 properties=properties)
420 LOG.debug("image:%s", image)
421
422 return image
423
424 def _log_console_output(self, servers=None):
425 if not CONF.compute_feature_enabled.console_output:
426 LOG.debug('Console output not supported, cannot log')
427 return
428 if not servers:
429 servers = self.servers_client.list_servers()
430 servers = servers['servers']
431 for server in servers:
432 try:
433 console_output = self.servers_client.get_console_output(
434 server['id'])['output']
435 LOG.debug('Console output for %s\nbody=\n%s',
436 server['id'], console_output)
437 except lib_exc.NotFound:
438 LOG.debug("Server %s disappeared(deleted) while looking "
439 "for the console log", server['id'])
440
441 def _log_net_info(self, exc):
442 # network debug is called as part of ssh init
443 if not isinstance(exc, lib_exc.SSHTimeout):
444 LOG.debug('Network information on a devstack host')
445
446 def create_server_snapshot(self, server, name=None):
447 # Glance client
448 _image_client = self.image_client
449 # Compute client
450 _images_client = self.compute_images_client
451 if name is None:
452 name = data_utils.rand_name(self.__class__.__name__ + 'snapshot')
453 LOG.debug("Creating a snapshot image for server: %s", server['name'])
454 image = _images_client.create_image(server['id'], name=name)
455 image_id = image.response['location'].split('images/')[1]
456 waiters.wait_for_image_status(_image_client, image_id, 'active')
457
458 self.addCleanup(_image_client.wait_for_resource_deletion,
459 image_id)
460 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
461 _image_client.delete_image, image_id)
462
463 if CONF.image_feature_enabled.api_v1:
464 # In glance v1 the additional properties are stored in the headers.
465 resp = _image_client.check_image(image_id)
466 snapshot_image = common_image.get_image_meta_from_headers(resp)
467 image_props = snapshot_image.get('properties', {})
468 else:
469 # In glance v2 the additional properties are flattened.
470 snapshot_image = _image_client.show_image(image_id)
471 image_props = snapshot_image
472
473 bdm = image_props.get('block_device_mapping')
474 if bdm:
475 bdm = json.loads(bdm)
476 if bdm and 'snapshot_id' in bdm[0]:
477 snapshot_id = bdm[0]['snapshot_id']
478 self.addCleanup(
479 self.snapshots_client.wait_for_resource_deletion,
480 snapshot_id)
481 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
482 self.snapshots_client.delete_snapshot,
483 snapshot_id)
484 waiters.wait_for_volume_resource_status(self.snapshots_client,
485 snapshot_id,
486 'available')
487 image_name = snapshot_image['name']
488 self.assertEqual(name, image_name)
489 LOG.debug("Created snapshot image %s for server %s",
490 image_name, server['name'])
491 return snapshot_image
492
493 def nova_volume_attach(self, server, volume_to_attach):
494 volume = self.servers_client.attach_volume(
495 server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
496 % CONF.compute.volume_device_name)['volumeAttachment']
497 self.assertEqual(volume_to_attach['id'], volume['id'])
498 waiters.wait_for_volume_resource_status(self.volumes_client,
499 volume['id'], 'in-use')
500
501 # Return the updated volume after the attachment
502 return self.volumes_client.show_volume(volume['id'])['volume']
503
504 def nova_volume_detach(self, server, volume):
505 self.servers_client.detach_volume(server['id'], volume['id'])
506 waiters.wait_for_volume_resource_status(self.volumes_client,
507 volume['id'], 'available')
508
509 volume = self.volumes_client.show_volume(volume['id'])['volume']
510 self.assertEqual('available', volume['status'])
511
512 def rebuild_server(self, server_id, image=None,
513 preserve_ephemeral=False, wait=True,
514 rebuild_kwargs=None):
515 if image is None:
516 image = CONF.compute.image_ref
517
518 rebuild_kwargs = rebuild_kwargs or {}
519
520 LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
521 server_id, image, preserve_ephemeral)
522 self.servers_client.rebuild_server(
523 server_id=server_id, image_ref=image,
524 preserve_ephemeral=preserve_ephemeral,
525 **rebuild_kwargs)
526 if wait:
527 waiters.wait_for_server_status(self.servers_client,
528 server_id, 'ACTIVE')
529
530 def ping_ip_address(self, ip_address, should_succeed=True,
531 ping_timeout=None, mtu=None):
532 timeout = ping_timeout or CONF.validation.ping_timeout
533 cmd = ['ping', '-c1', '-w1']
534
535 if mtu:
536 cmd += [
537 # don't fragment
538 '-M', 'do',
539 # ping receives just the size of ICMP payload
540 '-s', str(net_utils.get_ping_payload_size(mtu, 4))
541 ]
542 cmd.append(ip_address)
543
544 def ping():
545 proc = subprocess.Popen(cmd,
546 stdout=subprocess.PIPE,
547 stderr=subprocess.PIPE)
548 proc.communicate()
549
550 return (proc.returncode == 0) == should_succeed
551
552 caller = test_utils.find_test_caller()
553 LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
554 ' expected result is %(should_succeed)s', {
555 'caller': caller, 'ip': ip_address, 'timeout': timeout,
556 'should_succeed':
557 'reachable' if should_succeed else 'unreachable'
558 })
559 result = test_utils.call_until_true(ping, timeout, 1)
560 LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
561 'ping result is %(result)s', {
562 'caller': caller, 'ip': ip_address, 'timeout': timeout,
563 'result': 'expected' if result else 'unexpected'
564 })
565 return result
566
567 def check_vm_connectivity(self, ip_address,
568 username=None,
569 private_key=None,
570 should_connect=True,
571 mtu=None):
572 """Check server connectivity
573
574 :param ip_address: server to test against
575 :param username: server's ssh username
576 :param private_key: server's ssh private key to be used
577 :param should_connect: True/False indicates positive/negative test
578 positive - attempt ping and ssh
579 negative - attempt ping and fail if succeed
580 :param mtu: network MTU to use for connectivity validation
581
582 :raises: AssertError if the result of the connectivity check does
583 not match the value of the should_connect param
584 """
585 if should_connect:
586 msg = "Timed out waiting for %s to become reachable" % ip_address
587 else:
588 msg = "ip address %s is reachable" % ip_address
589 self.assertTrue(self.ping_ip_address(ip_address,
590 should_succeed=should_connect,
591 mtu=mtu),
592 msg=msg)
593 if should_connect:
594 # no need to check ssh for negative connectivity
595 self.get_remote_client(ip_address, username, private_key)
596
597 def check_public_network_connectivity(self, ip_address, username,
598 private_key, should_connect=True,
599 msg=None, servers=None, mtu=None):
600 # The target login is assumed to have been configured for
601 # key-based authentication by cloud-init.
602 LOG.debug('checking network connections to IP %s with user: %s',
603 ip_address, username)
604 try:
605 self.check_vm_connectivity(ip_address,
606 username,
607 private_key,
608 should_connect=should_connect,
609 mtu=mtu)
610 except Exception:
611 ex_msg = 'Public network connectivity check failed'
612 if msg:
613 ex_msg += ": " + msg
614 LOG.exception(ex_msg)
615 self._log_console_output(servers)
616 raise
617
618 def create_floating_ip(self, thing, pool_name=None):
619 """Create a floating IP and associates to a server on Nova"""
620
621 if not pool_name:
622 pool_name = CONF.network.floating_network_name
623 floating_ip = (self.compute_floating_ips_client.
624 create_floating_ip(pool=pool_name)['floating_ip'])
625 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
626 self.compute_floating_ips_client.delete_floating_ip,
627 floating_ip['id'])
628 self.compute_floating_ips_client.associate_floating_ip_to_server(
629 floating_ip['ip'], thing['id'])
630 return floating_ip
631
632 def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
633 private_key=None):
634 ssh_client = self.get_remote_client(ip_address,
635 private_key=private_key)
636 if dev_name is not None:
637 ssh_client.make_fs(dev_name)
638 ssh_client.mount(dev_name, mount_path)
639 cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
640 ssh_client.exec_command(cmd_timestamp)
641 timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
642 % mount_path)
643 if dev_name is not None:
644 ssh_client.umount(mount_path)
645 return timestamp
646
647 def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
648 private_key=None):
649 ssh_client = self.get_remote_client(ip_address,
650 private_key=private_key)
651 if dev_name is not None:
652 ssh_client.mount(dev_name, mount_path)
653 timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
654 % mount_path)
655 if dev_name is not None:
656 ssh_client.umount(mount_path)
657 return timestamp
658
659 def get_server_ip(self, server):
660 """Get the server fixed or floating IP.
661
662 Based on the configuration we're in, return a correct ip
663 address for validating that a guest is up.
664 """
665 if CONF.validation.connect_method == 'floating':
666 # The tests calling this method don't have a floating IP
667 # and can't make use of the validation resources. So the
668 # method is creating the floating IP there.
669 return self.create_floating_ip(server)['ip']
670 elif CONF.validation.connect_method == 'fixed':
671 # Determine the network name to look for based on config or creds
672 # provider network resources.
673 if CONF.validation.network_for_ssh:
674 addresses = server['addresses'][
675 CONF.validation.network_for_ssh]
676 else:
677 creds_provider = self._get_credentials_provider()
678 net_creds = creds_provider.get_primary_creds()
679 network = getattr(net_creds, 'network', None)
680 addresses = (server['addresses'][network['name']]
681 if network else [])
682 for address in addresses:
683 if (address['version'] == CONF.validation.ip_version_for_ssh
684 and address['OS-EXT-IPS:type'] == 'fixed'):
685 return address['addr']
686 raise exceptions.ServerUnreachable(server_id=server['id'])
687 else:
688 raise lib_exc.InvalidConfiguration()
689
690
691class NetworkScenarioTest(ScenarioTest):
692 """Base class for network scenario tests.
693
694 This class provide helpers for network scenario tests, using the neutron
695 API. Helpers from ancestor which use the nova network API are overridden
696 with the neutron API.
697
698 This Class also enforces using Neutron instead of novanetwork.
699 Subclassed tests will be skipped if Neutron is not enabled
700
701 """
702
703 credentials = ['primary', 'admin']
704
705 @classmethod
706 def skip_checks(cls):
707 super(NetworkScenarioTest, cls).skip_checks()
708 if not CONF.service_available.neutron:
709 raise cls.skipException('Neutron not available')
710
711 def _create_network(self, networks_client=None,
712 tenant_id=None,
713 namestart='network-smoke-',
714 port_security_enabled=True):
715 if not networks_client:
716 networks_client = self.networks_client
717 if not tenant_id:
718 tenant_id = networks_client.tenant_id
719 name = data_utils.rand_name(namestart)
720 network_kwargs = dict(name=name, tenant_id=tenant_id)
721 # Neutron disables port security by default so we have to check the
722 # config before trying to create the network with port_security_enabled
723 if CONF.network_feature_enabled.port_security:
724 network_kwargs['port_security_enabled'] = port_security_enabled
725 result = networks_client.create_network(**network_kwargs)
726 network = result['network']
727
728 self.assertEqual(network['name'], name)
729 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
730 networks_client.delete_network,
731 network['id'])
732 return network
733
734 def _create_subnet(self, network, subnets_client=None,
735 routers_client=None, namestart='subnet-smoke',
736 **kwargs):
737 """Create a subnet for the given network
738
739 within the cidr block configured for tenant networks.
740 """
741 if not subnets_client:
742 subnets_client = self.subnets_client
743 if not routers_client:
744 routers_client = self.routers_client
745
746 def cidr_in_use(cidr, tenant_id):
747 """Check cidr existence
748
749 :returns: True if subnet with cidr already exist in tenant
750 False else
751 """
752 cidr_in_use = self.admin_manager.subnets_client.list_subnets(
753 tenant_id=tenant_id, cidr=cidr)['subnets']
754 return len(cidr_in_use) != 0
755
756 ip_version = kwargs.pop('ip_version', 4)
757
758 if ip_version == 6:
759 tenant_cidr = netaddr.IPNetwork(
760 CONF.network.project_network_v6_cidr)
761 num_bits = CONF.network.project_network_v6_mask_bits
762 else:
763 tenant_cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
764 num_bits = CONF.network.project_network_mask_bits
765
766 result = None
767 str_cidr = None
768 # Repeatedly attempt subnet creation with sequential cidr
769 # blocks until an unallocated block is found.
770 for subnet_cidr in tenant_cidr.subnet(num_bits):
771 str_cidr = str(subnet_cidr)
772 if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
773 continue
774
775 subnet = dict(
776 name=data_utils.rand_name(namestart),
777 network_id=network['id'],
778 tenant_id=network['tenant_id'],
779 cidr=str_cidr,
780 ip_version=ip_version,
781 **kwargs
782 )
783 try:
784 result = subnets_client.create_subnet(**subnet)
785 break
786 except lib_exc.Conflict as e:
787 is_overlapping_cidr = 'overlaps with another subnet' in str(e)
788 if not is_overlapping_cidr:
789 raise
790 self.assertIsNotNone(result, 'Unable to allocate tenant network')
791
792 subnet = result['subnet']
793 self.assertEqual(subnet['cidr'], str_cidr)
794
795 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
796 subnets_client.delete_subnet, subnet['id'])
797
798 return subnet
799
800 def _get_server_port_id_and_ip4(self, server, ip_addr=None):
801 ports = self.admin_manager.ports_client.list_ports(
802 device_id=server['id'], fixed_ip=ip_addr)['ports']
803 # A port can have more than one IP address in some cases.
804 # If the network is dual-stack (IPv4 + IPv6), this port is associated
805 # with 2 subnets
806 p_status = ['ACTIVE']
807 # NOTE(vsaienko) With Ironic, instances live on separate hardware
808 # servers. Neutron does not bind ports for Ironic instances, as a
809 # result the port remains in the DOWN state.
810 # TODO(vsaienko) remove once bug: #1599836 is resolved.
811 if getattr(CONF.service_available, 'ironic', False):
812 p_status.append('DOWN')
813 port_map = [(p["id"], fxip["ip_address"])
814 for p in ports
815 for fxip in p["fixed_ips"]
816 if netutils.is_valid_ipv4(fxip["ip_address"])
817 and p['status'] in p_status]
818 inactive = [p for p in ports if p['status'] != 'ACTIVE']
819 if inactive:
820 LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
821
822 self.assertNotEqual(0, len(port_map),
823 "No IPv4 addresses found in: %s" % ports)
824 self.assertEqual(len(port_map), 1,
825 "Found multiple IPv4 addresses: %s. "
826 "Unable to determine which port to target."
827 % port_map)
828 return port_map[0]
829
830 def _get_network_by_name(self, network_name):
831 net = self.admin_manager.networks_client.list_networks(
832 name=network_name)['networks']
833 self.assertNotEqual(len(net), 0,
834 "Unable to get network by name: %s" % network_name)
835 return net[0]
836
837 def create_floating_ip(self, thing, external_network_id=None,
838 port_id=None, client=None):
839 """Create a floating IP and associates to a resource/port on Neutron"""
840 if not external_network_id:
841 external_network_id = CONF.network.public_network_id
842 if not client:
843 client = self.floating_ips_client
844 if not port_id:
845 port_id, ip4 = self._get_server_port_id_and_ip4(thing)
846 else:
847 ip4 = None
848 result = client.create_floatingip(
849 floating_network_id=external_network_id,
850 port_id=port_id,
851 tenant_id=thing['tenant_id'],
852 fixed_ip_address=ip4
853 )
854 floating_ip = result['floatingip']
855 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
856 client.delete_floatingip,
857 floating_ip['id'])
858 return floating_ip
859
860 def _associate_floating_ip(self, floating_ip, server):
861 port_id, _ = self._get_server_port_id_and_ip4(server)
862 kwargs = dict(port_id=port_id)
863 floating_ip = self.floating_ips_client.update_floatingip(
864 floating_ip['id'], **kwargs)['floatingip']
865 self.assertEqual(port_id, floating_ip['port_id'])
866 return floating_ip
867
868 def _disassociate_floating_ip(self, floating_ip):
869 """:param floating_ip: floating_ips_client.create_floatingip"""
870 kwargs = dict(port_id=None)
871 floating_ip = self.floating_ips_client.update_floatingip(
872 floating_ip['id'], **kwargs)['floatingip']
873 self.assertIsNone(floating_ip['port_id'])
874 return floating_ip
875
876 def check_floating_ip_status(self, floating_ip, status):
877 """Verifies floatingip reaches the given status
878
879 :param dict floating_ip: floating IP dict to check status
880 :param status: target status
881 :raises: AssertionError if status doesn't match
882 """
883 floatingip_id = floating_ip['id']
884
885 def refresh():
886 result = (self.floating_ips_client.
887 show_floatingip(floatingip_id)['floatingip'])
888 return status == result['status']
889
890 test_utils.call_until_true(refresh,
891 CONF.network.build_timeout,
892 CONF.network.build_interval)
893 floating_ip = self.floating_ips_client.show_floatingip(
894 floatingip_id)['floatingip']
895 self.assertEqual(status, floating_ip['status'],
896 message="FloatingIP: {fp} is at status: {cst}. "
897 "failed to reach status: {st}"
898 .format(fp=floating_ip, cst=floating_ip['status'],
899 st=status))
900 LOG.info("FloatingIP: {fp} is at status: {st}"
901 .format(fp=floating_ip, st=status))
902
903 def _check_tenant_network_connectivity(self, server,
904 username,
905 private_key,
906 should_connect=True,
907 servers_for_debug=None):
908 if not CONF.network.project_networks_reachable:
909 msg = 'Tenant networks not configured to be reachable.'
910 LOG.info(msg)
911 return
912 # The target login is assumed to have been configured for
913 # key-based authentication by cloud-init.
914 try:
915 for net_name, ip_addresses in server['addresses'].items():
916 for ip_address in ip_addresses:
917 self.check_vm_connectivity(ip_address['addr'],
918 username,
919 private_key,
920 should_connect=should_connect)
921 except Exception as e:
922 LOG.exception('Tenant network connectivity check failed')
923 self._log_console_output(servers_for_debug)
924 self._log_net_info(e)
925 raise
926
927 def _check_remote_connectivity(self, source, dest, should_succeed=True,
928 nic=None):
929 """check ping server via source ssh connection
930
931 :param source: RemoteClient: an ssh connection from which to ping
932 :param dest: and IP to ping against
933 :param should_succeed: boolean should ping succeed or not
934 :param nic: specific network interface to ping from
935 :returns: boolean -- should_succeed == ping
936 :returns: ping is false if ping failed
937 """
938 def ping_remote():
939 try:
940 source.ping_host(dest, nic=nic)
941 except lib_exc.SSHExecCommandFailed:
942 LOG.warning('Failed to ping IP: %s via a ssh connection '
943 'from: %s.', dest, source.ssh_client.host)
944 return not should_succeed
945 return should_succeed
946
947 return test_utils.call_until_true(ping_remote,
948 CONF.validation.ping_timeout,
949 1)
950
951 def _create_security_group(self, security_group_rules_client=None,
952 tenant_id=None,
953 namestart='secgroup-smoke',
954 security_groups_client=None):
955 if security_group_rules_client is None:
956 security_group_rules_client = self.security_group_rules_client
957 if security_groups_client is None:
958 security_groups_client = self.security_groups_client
959 if tenant_id is None:
960 tenant_id = security_groups_client.tenant_id
961 secgroup = self._create_empty_security_group(
962 namestart=namestart, client=security_groups_client,
963 tenant_id=tenant_id)
964
965 # Add rules to the security group
966 rules = self._create_loginable_secgroup_rule(
967 security_group_rules_client=security_group_rules_client,
968 secgroup=secgroup,
969 security_groups_client=security_groups_client)
970 for rule in rules:
971 self.assertEqual(tenant_id, rule['tenant_id'])
972 self.assertEqual(secgroup['id'], rule['security_group_id'])
973 return secgroup
974
975 def _create_empty_security_group(self, client=None, tenant_id=None,
976 namestart='secgroup-smoke'):
977 """Create a security group without rules.
978
979 Default rules will be created:
980 - IPv4 egress to any
981 - IPv6 egress to any
982
983 :param tenant_id: secgroup will be created in this tenant
984 :returns: the created security group
985 """
986 if client is None:
987 client = self.security_groups_client
988 if not tenant_id:
989 tenant_id = client.tenant_id
990 sg_name = data_utils.rand_name(namestart)
991 sg_desc = sg_name + " description"
992 sg_dict = dict(name=sg_name,
993 description=sg_desc)
994 sg_dict['tenant_id'] = tenant_id
995 result = client.create_security_group(**sg_dict)
996
997 secgroup = result['security_group']
998 self.assertEqual(secgroup['name'], sg_name)
999 self.assertEqual(tenant_id, secgroup['tenant_id'])
1000 self.assertEqual(secgroup['description'], sg_desc)
1001
1002 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1003 client.delete_security_group, secgroup['id'])
1004 return secgroup
1005
1006 def _default_security_group(self, client=None, tenant_id=None):
1007 """Get default secgroup for given tenant_id.
1008
1009 :returns: default secgroup for given tenant
1010 """
1011 if client is None:
1012 client = self.security_groups_client
1013 if not tenant_id:
1014 tenant_id = client.tenant_id
1015 sgs = [
1016 sg for sg in list(client.list_security_groups().values())[0]
1017 if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
1018 ]
1019 msg = "No default security group for tenant %s." % (tenant_id)
1020 self.assertGreater(len(sgs), 0, msg)
1021 return sgs[0]
1022
1023 def _create_security_group_rule(self, secgroup=None,
1024 sec_group_rules_client=None,
1025 tenant_id=None,
1026 security_groups_client=None, **kwargs):
1027 """Create a rule from a dictionary of rule parameters.
1028
1029 Create a rule in a secgroup. if secgroup not defined will search for
1030 default secgroup in tenant_id.
1031
1032 :param secgroup: the security group.
1033 :param tenant_id: if secgroup not passed -- the tenant in which to
1034 search for default secgroup
1035 :param kwargs: a dictionary containing rule parameters:
1036 for example, to allow incoming ssh:
1037 rule = {
1038 direction: 'ingress'
1039 protocol:'tcp',
1040 port_range_min: 22,
1041 port_range_max: 22
1042 }
1043 """
1044 if sec_group_rules_client is None:
1045 sec_group_rules_client = self.security_group_rules_client
1046 if security_groups_client is None:
1047 security_groups_client = self.security_groups_client
1048 if not tenant_id:
1049 tenant_id = security_groups_client.tenant_id
1050 if secgroup is None:
1051 secgroup = self._default_security_group(
1052 client=security_groups_client, tenant_id=tenant_id)
1053
1054 ruleset = dict(security_group_id=secgroup['id'],
1055 tenant_id=secgroup['tenant_id'])
1056 ruleset.update(kwargs)
1057
1058 sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
1059 sg_rule = sg_rule['security_group_rule']
1060
1061 self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
1062 self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
1063
1064 return sg_rule
1065
1066 def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
1067 secgroup=None,
1068 security_groups_client=None):
1069 """Create loginable security group rule
1070
1071 This function will create:
1072 1. egress and ingress tcp port 22 allow rule in order to allow ssh
1073 access for ipv4.
1074 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
1075 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
1076 """
1077
1078 if security_group_rules_client is None:
1079 security_group_rules_client = self.security_group_rules_client
1080 if security_groups_client is None:
1081 security_groups_client = self.security_groups_client
1082 rules = []
1083 rulesets = [
1084 dict(
1085 # ssh
1086 protocol='tcp',
1087 port_range_min=22,
1088 port_range_max=22,
1089 ),
1090 dict(
1091 # ping
1092 protocol='icmp',
1093 ),
1094 dict(
1095 # ipv6-icmp for ping6
1096 protocol='icmp',
1097 ethertype='IPv6',
1098 )
1099 ]
1100 sec_group_rules_client = security_group_rules_client
1101 for ruleset in rulesets:
1102 for r_direction in ['ingress', 'egress']:
1103 ruleset['direction'] = r_direction
1104 try:
1105 sg_rule = self._create_security_group_rule(
1106 sec_group_rules_client=sec_group_rules_client,
1107 secgroup=secgroup,
1108 security_groups_client=security_groups_client,
1109 **ruleset)
1110 except lib_exc.Conflict as ex:
1111 # if rule already exist - skip rule and continue
1112 msg = 'Security group rule already exists'
1113 if msg not in ex._error_string:
1114 raise ex
1115 else:
1116 self.assertEqual(r_direction, sg_rule['direction'])
1117 rules.append(sg_rule)
1118
1119 return rules
1120
1121 def _get_router(self, client=None, tenant_id=None):
1122 """Retrieve a router for the given tenant id.
1123
1124 If a public router has been configured, it will be returned.
1125
1126 If a public router has not been configured, but a public
1127 network has, a tenant router will be created and returned that
1128 routes traffic to the public network.
1129 """
1130 if not client:
1131 client = self.routers_client
1132 if not tenant_id:
1133 tenant_id = client.tenant_id
1134 router_id = CONF.network.public_router_id
1135 network_id = CONF.network.public_network_id
1136 if router_id:
1137 body = client.show_router(router_id)
1138 return body['router']
1139 elif network_id:
1140 router = self._create_router(client, tenant_id)
1141 kwargs = {'external_gateway_info': dict(network_id=network_id)}
1142 router = client.update_router(router['id'], **kwargs)['router']
1143 return router
1144 else:
1145 raise Exception("Neither of 'public_router_id' or "
1146 "'public_network_id' has been defined.")
1147
1148 def _create_router(self, client=None, tenant_id=None,
1149 namestart='router-smoke'):
1150 if not client:
1151 client = self.routers_client
1152 if not tenant_id:
1153 tenant_id = client.tenant_id
1154 name = data_utils.rand_name(namestart)
1155 result = client.create_router(name=name,
1156 admin_state_up=True,
1157 tenant_id=tenant_id)
1158 router = result['router']
1159 self.assertEqual(router['name'], name)
1160 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1161 client.delete_router,
1162 router['id'])
1163 return router
1164
1165 def _update_router_admin_state(self, router, admin_state_up):
1166 kwargs = dict(admin_state_up=admin_state_up)
1167 router = self.routers_client.update_router(
1168 router['id'], **kwargs)['router']
1169 self.assertEqual(admin_state_up, router['admin_state_up'])
1170
1171 def create_networks(self, networks_client=None,
1172 routers_client=None, subnets_client=None,
1173 tenant_id=None, dns_nameservers=None,
1174 port_security_enabled=True):
1175 """Create a network with a subnet connected to a router.
1176
1177 The baremetal driver is a special case since all nodes are
1178 on the same shared network.
1179
1180 :param tenant_id: id of tenant to create resources in.
1181 :param dns_nameservers: list of dns servers to send to subnet.
1182 :returns: network, subnet, router
1183 """
1184 if CONF.network.shared_physical_network:
1185 # NOTE(Shrews): This exception is for environments where tenant
1186 # credential isolation is available, but network separation is
1187 # not (the current baremetal case). Likely can be removed when
1188 # test account mgmt is reworked:
1189 # https://blueprints.launchpad.net/tempest/+spec/test-accounts
1190 if not CONF.compute.fixed_network_name:
1191 m = 'fixed_network_name must be specified in config'
1192 raise lib_exc.InvalidConfiguration(m)
1193 network = self._get_network_by_name(
1194 CONF.compute.fixed_network_name)
1195 router = None
1196 subnet = None
1197 else:
1198 network = self._create_network(
1199 networks_client=networks_client,
1200 tenant_id=tenant_id,
1201 port_security_enabled=port_security_enabled)
1202 router = self._get_router(client=routers_client,
1203 tenant_id=tenant_id)
1204 subnet_kwargs = dict(network=network,
1205 subnets_client=subnets_client,
1206 routers_client=routers_client)
1207 # use explicit check because empty list is a valid option
1208 if dns_nameservers is not None:
1209 subnet_kwargs['dns_nameservers'] = dns_nameservers
1210 subnet = self._create_subnet(**subnet_kwargs)
1211 if not routers_client:
1212 routers_client = self.routers_client
1213 router_id = router['id']
1214 routers_client.add_router_interface(router_id,
1215 subnet_id=subnet['id'])
1216
1217 # save a cleanup job to remove this association between
1218 # router and subnet
1219 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1220 routers_client.remove_router_interface, router_id,
1221 subnet_id=subnet['id'])
1222 return network, subnet, router
1223
1224
1225class EncryptionScenarioTest(ScenarioTest):
1226 """Base class for encryption scenario tests"""
1227
1228 credentials = ['primary', 'admin']
1229
1230 @classmethod
1231 def setup_clients(cls):
1232 super(EncryptionScenarioTest, cls).setup_clients()
1233 if CONF.volume_feature_enabled.api_v2:
1234 cls.admin_volume_types_client = cls.os_adm.volume_types_v2_client
1235 cls.admin_encryption_types_client =\
1236 cls.os_adm.encryption_types_v2_client
1237 else:
1238 cls.admin_volume_types_client = cls.os_adm.volume_types_client
1239 cls.admin_encryption_types_client =\
1240 cls.os_adm.encryption_types_client
1241
1242 def create_encryption_type(self, client=None, type_id=None, provider=None,
1243 key_size=None, cipher=None,
1244 control_location=None):
1245 if not client:
1246 client = self.admin_encryption_types_client
1247 if not type_id:
1248 volume_type = self.create_volume_type()
1249 type_id = volume_type['id']
1250 LOG.debug("Creating an encryption type for volume type: %s", type_id)
1251 client.create_encryption_type(
1252 type_id, provider=provider, key_size=key_size, cipher=cipher,
1253 control_location=control_location)['encryption']
1254
1255
1256class ObjectStorageScenarioTest(ScenarioTest):
1257 """Provide harness to do Object Storage scenario tests.
1258
1259 Subclasses implement the tests that use the methods provided by this
1260 class.
1261 """
1262
1263 @classmethod
1264 def skip_checks(cls):
1265 super(ObjectStorageScenarioTest, cls).skip_checks()
1266 if not CONF.service_available.swift:
1267 skip_msg = ("%s skipped as swift is not available" %
1268 cls.__name__)
1269 raise cls.skipException(skip_msg)
1270
1271 @classmethod
1272 def setup_credentials(cls):
1273 cls.set_network_resources()
1274 super(ObjectStorageScenarioTest, cls).setup_credentials()
1275 operator_role = CONF.object_storage.operator_role
1276 cls.os_operator = cls.get_client_manager(roles=[operator_role])
1277
1278 @classmethod
1279 def setup_clients(cls):
1280 super(ObjectStorageScenarioTest, cls).setup_clients()
1281 # Clients for Swift
1282 cls.account_client = cls.os_operator.account_client
1283 cls.container_client = cls.os_operator.container_client
1284 cls.object_client = cls.os_operator.object_client
1285
1286 def get_swift_stat(self):
1287 """get swift status for our user account."""
1288 self.account_client.list_account_containers()
1289 LOG.debug('Swift status information obtained successfully')
1290
1291 def create_container(self, container_name=None):
1292 name = container_name or data_utils.rand_name(
1293 'swift-scenario-container')
1294 self.container_client.create_container(name)
1295 # look for the container to assure it is created
1296 self.list_and_check_container_objects(name)
1297 LOG.debug('Container %s created', name)
1298 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1299 self.container_client.delete_container,
1300 name)
1301 return name
1302
1303 def delete_container(self, container_name):
1304 self.container_client.delete_container(container_name)
1305 LOG.debug('Container %s deleted', container_name)
1306
1307 def upload_object_to_container(self, container_name, obj_name=None):
1308 obj_name = obj_name or data_utils.rand_name('swift-scenario-object')
1309 obj_data = data_utils.random_bytes()
1310 self.object_client.create_object(container_name, obj_name, obj_data)
1311 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
1312 self.object_client.delete_object,
1313 container_name,
1314 obj_name)
1315 return obj_name, obj_data
1316
1317 def delete_object(self, container_name, filename):
1318 self.object_client.delete_object(container_name, filename)
1319 self.list_and_check_container_objects(container_name,
1320 not_present_obj=[filename])
1321
1322 def list_and_check_container_objects(self, container_name,
1323 present_obj=None,
1324 not_present_obj=None):
1325 # List objects for a given container and assert which are present and
1326 # which are not.
1327 if present_obj is None:
1328 present_obj = []
1329 if not_present_obj is None:
1330 not_present_obj = []
1331 _, object_list = self.container_client.list_container_contents(
1332 container_name)
1333 if present_obj:
1334 for obj in present_obj:
1335 self.assertIn(obj, object_list)
1336 if not_present_obj:
1337 for obj in not_present_obj:
1338 self.assertNotIn(obj, object_list)
1339
1340 def change_container_acl(self, container_name, acl):
1341 metadata_param = {'metadata_prefix': 'x-container-',
1342 'metadata': {'read': acl}}
1343 self.container_client.update_container_metadata(container_name,
1344 **metadata_param)
1345 resp, _ = self.container_client.list_container_metadata(container_name)
1346 self.assertEqual(resp['x-container-read'], acl)
1347
1348 def download_and_verify(self, container_name, obj_name, expected_data):
1349 _, obj = self.object_client.get_object(container_name, obj_name)
1350 self.assertEqual(obj, expected_data)