blob: 5924c7ea99095b87dd9b366430ef65834bc5a953 [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.volume import base
from tempest.common.utils import data_utils
from tempest import config
from tempest.test import attr
from tempest.test import services
from tempest.test import stresstest
CONF = config.CONF
class VolumesActionsTest(base.BaseVolumeV1Test):
_interface = "json"
@classmethod
def setUpClass(cls):
super(VolumesActionsTest, cls).setUpClass()
cls.client = cls.volumes_client
cls.image_client = cls.os.image_client
# Create a test shared instance
srv_name = data_utils.rand_name(cls.__name__ + '-Instance-')
resp, cls.server = cls.servers_client.create_server(srv_name,
cls.image_ref,
cls.flavor_ref)
cls.servers_client.wait_for_server_status(cls.server['id'], 'ACTIVE')
# Create a test shared volume for attach/detach tests
cls.volume = cls.create_volume()
@classmethod
def tearDownClass(cls):
# Delete the test instance
cls.servers_client.delete_server(cls.server['id'])
cls.client.wait_for_resource_deletion(cls.server['id'])
super(VolumesActionsTest, cls).tearDownClass()
@stresstest(class_setup_per='process')
@attr(type='smoke')
@services('compute')
def test_attach_detach_volume_to_instance(self):
# Volume is attached and detached successfully from an instance
mountpoint = '/dev/vdc'
resp, body = self.client.attach_volume(self.volume['id'],
self.server['id'],
mountpoint)
self.assertEqual(202, resp.status)
self.client.wait_for_volume_status(self.volume['id'], 'in-use')
resp, body = self.client.detach_volume(self.volume['id'])
self.assertEqual(202, resp.status)
self.client.wait_for_volume_status(self.volume['id'], 'available')
@stresstest(class_setup_per='process')
@attr(type='gate')
@services('compute')
def test_get_volume_attachment(self):
# Verify that a volume's attachment information is retrieved
mountpoint = '/dev/vdc'
resp, body = self.client.attach_volume(self.volume['id'],
self.server['id'],
mountpoint)
self.assertEqual(202, resp.status)
self.client.wait_for_volume_status(self.volume['id'], 'in-use')
# NOTE(gfidente): added in reverse order because functions will be
# called in reverse order to the order they are added (LIFO)
self.addCleanup(self.client.wait_for_volume_status,
self.volume['id'],
'available')
self.addCleanup(self.client.detach_volume, self.volume['id'])
resp, volume = self.client.get_volume(self.volume['id'])
self.assertEqual(200, resp.status)
self.assertIn('attachments', volume)
attachment = self.client.get_attachment_from_volume(volume)
self.assertEqual(mountpoint, attachment['device'])
self.assertEqual(self.server['id'], attachment['server_id'])
self.assertEqual(self.volume['id'], attachment['id'])
self.assertEqual(self.volume['id'], attachment['volume_id'])
@attr(type='gate')
@services('image')
def test_volume_upload(self):
# NOTE(gfidente): the volume uploaded in Glance comes from setUpClass,
# it is shared with the other tests. After it is uploaded in Glance,
# there is no way to delete it from Cinder, so we delete it from Glance
# using the Glance image_client and from Cinder via tearDownClass.
image_name = data_utils.rand_name('Image-')
resp, body = self.client.upload_volume(self.volume['id'],
image_name,
CONF.volume.disk_format)
image_id = body["image_id"]
self.addCleanup(self.image_client.delete_image, image_id)
self.assertEqual(202, resp.status)
self.image_client.wait_for_image_status(image_id, 'active')
self.client.wait_for_volume_status(self.volume['id'], 'available')
@attr(type='gate')
def test_volume_extend(self):
# Extend Volume Test.
extend_size = int(self.volume['size']) + 1
resp, body = self.client.extend_volume(self.volume['id'], extend_size)
self.assertEqual(202, resp.status)
self.client.wait_for_volume_status(self.volume['id'], 'available')
resp, volume = self.client.get_volume(self.volume['id'])
self.assertEqual(200, resp.status)
self.assertEqual(int(volume['size']), extend_size)
@attr(type='gate')
def test_reserve_unreserve_volume(self):
# Mark volume as reserved.
resp, body = self.client.reserve_volume(self.volume['id'])
self.assertEqual(202, resp.status)
# To get the volume info
resp, body = self.client.get_volume(self.volume['id'])
self.assertEqual(200, resp.status)
self.assertIn('attaching', body['status'])
# Unmark volume as reserved.
resp, body = self.client.unreserve_volume(self.volume['id'])
self.assertEqual(202, resp.status)
# To get the volume info
resp, body = self.client.get_volume(self.volume['id'])
self.assertEqual(200, resp.status)
self.assertIn('available', body['status'])
def _is_true(self, val):
return val in ['true', 'True', True]
@attr(type='gate')
def test_volume_readonly_update(self):
# Update volume readonly true
readonly = True
resp, body = self.client.update_volume_readonly(self.volume['id'],
readonly)
self.assertEqual(202, resp.status)
# Get Volume information
resp, fetched_volume = self.client.get_volume(self.volume['id'])
bool_flag = self._is_true(fetched_volume['metadata']['readonly'])
self.assertEqual(200, resp.status)
self.assertEqual(True, bool_flag)
# Update volume readonly false
readonly = False
resp, body = self.client.update_volume_readonly(self.volume['id'],
readonly)
self.assertEqual(202, resp.status)
# Get Volume information
resp, fetched_volume = self.client.get_volume(self.volume['id'])
bool_flag = self._is_true(fetched_volume['metadata']['readonly'])
self.assertEqual(200, resp.status)
self.assertEqual(False, bool_flag)
class VolumesActionsTestXML(VolumesActionsTest):
_interface = "xml"