port related volumes tests into nova v3 part1
This changeset only copies the v2 files into the appropriate v3
directories unchanged. This is being tried in order to make
reviewing of the porting easier as gerrit will display only what
is actually changed for v3 rather than entirely new files.
Partially implements blueprint nova-v3-api-tests
Change-Id: I8799257fa161ebf4c8b4bc437a94c7af746b1112
diff --git a/tempest/api/compute/v3/volumes/__init__.py b/tempest/api/compute/v3/volumes/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/__init__.py
diff --git a/tempest/api/compute/v3/volumes/test_attach_volume.py b/tempest/api/compute/v3/volumes/test_attach_volume.py
new file mode 100644
index 0000000..660de95
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_attach_volume.py
@@ -0,0 +1,116 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM Corp.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import testtools
+
+from tempest.api.compute import base
+from tempest.common.utils.linux.remote_client import RemoteClient
+import tempest.config
+from tempest.test import attr
+
+
+class AttachVolumeTestJSON(base.BaseV2ComputeTest):
+ _interface = 'json'
+ run_ssh = tempest.config.TempestConfig().compute.run_ssh
+
+ def __init__(self, *args, **kwargs):
+ super(AttachVolumeTestJSON, self).__init__(*args, **kwargs)
+ self.server = None
+ self.volume = None
+ self.attached = False
+
+ @classmethod
+ def setUpClass(cls):
+ super(AttachVolumeTestJSON, cls).setUpClass()
+ cls.device = cls.config.compute.volume_device_name
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ def _detach(self, server_id, volume_id):
+ if self.attached:
+ self.servers_client.detach_volume(server_id, volume_id)
+ self.volumes_client.wait_for_volume_status(volume_id, 'available')
+
+ def _delete_volume(self):
+ if self.volume:
+ self.volumes_client.delete_volume(self.volume['id'])
+ self.volume = None
+
+ def _create_and_attach(self):
+ # Start a server and wait for it to become ready
+ admin_pass = self.image_ssh_password
+ resp, server = self.create_test_server(wait_until='ACTIVE',
+ adminPass=admin_pass)
+ self.server = server
+
+ # Record addresses so that we can ssh later
+ resp, server['addresses'] = \
+ self.servers_client.list_addresses(server['id'])
+
+ # Create a volume and wait for it to become ready
+ resp, volume = self.volumes_client.create_volume(1,
+ display_name='test')
+ self.volume = volume
+ self.addCleanup(self._delete_volume)
+ self.volumes_client.wait_for_volume_status(volume['id'], 'available')
+
+ # Attach the volume to the server
+ self.servers_client.attach_volume(server['id'], volume['id'],
+ device='/dev/%s' % self.device)
+ self.volumes_client.wait_for_volume_status(volume['id'], 'in-use')
+
+ self.attached = True
+ self.addCleanup(self._detach, server['id'], volume['id'])
+
+ @testtools.skipIf(not run_ssh, 'SSH required for this test')
+ @attr(type='gate')
+ def test_attach_detach_volume(self):
+ # Stop and Start a server with an attached volume, ensuring that
+ # the volume remains attached.
+ self._create_and_attach()
+ server = self.server
+ volume = self.volume
+
+ self.servers_client.stop(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'SHUTOFF')
+
+ self.servers_client.start(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
+
+ linux_client = RemoteClient(server,
+ self.image_ssh_user, server['adminPass'])
+ partitions = linux_client.get_partitions()
+ self.assertIn(self.device, partitions)
+
+ self._detach(server['id'], volume['id'])
+ self.attached = False
+
+ self.servers_client.stop(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'SHUTOFF')
+
+ self.servers_client.start(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
+
+ linux_client = RemoteClient(server,
+ self.image_ssh_user, server['adminPass'])
+ partitions = linux_client.get_partitions()
+ self.assertNotIn(self.device, partitions)
+
+
+class AttachVolumeTestXML(AttachVolumeTestJSON):
+ _interface = 'xml'
diff --git a/tempest/api/compute/v3/volumes/test_volumes_get.py b/tempest/api/compute/v3/volumes/test_volumes_get.py
new file mode 100644
index 0000000..ae6996d
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_volumes_get.py
@@ -0,0 +1,104 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.compute import base
+from tempest.common.utils import data_utils
+from tempest.test import attr
+
+
+class VolumesGetTestJSON(base.BaseV2ComputeTest):
+
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(VolumesGetTestJSON, cls).setUpClass()
+ cls.client = cls.volumes_extensions_client
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ @attr(type='smoke')
+ def test_volume_create_get_delete(self):
+ # CREATE, GET, DELETE Volume
+ volume = None
+ v_name = data_utils.rand_name('Volume-%s-') % self._interface
+ metadata = {'Type': 'work'}
+ # Create volume
+ resp, volume = self.client.create_volume(size=1,
+ display_name=v_name,
+ metadata=metadata)
+ self.addCleanup(self._delete_volume, volume)
+ self.assertEqual(200, resp.status)
+ self.assertIn('id', volume)
+ self.assertIn('displayName', volume)
+ self.assertEqual(volume['displayName'], v_name,
+ "The created volume name is not equal "
+ "to the requested name")
+ self.assertTrue(volume['id'] is not None,
+ "Field volume id is empty or not found.")
+ # Wait for Volume status to become ACTIVE
+ self.client.wait_for_volume_status(volume['id'], 'available')
+ # GET Volume
+ resp, fetched_volume = self.client.get_volume(volume['id'])
+ self.assertEqual(200, resp.status)
+ # Verfication of details of fetched Volume
+ self.assertEqual(v_name,
+ fetched_volume['displayName'],
+ 'The fetched Volume is different '
+ 'from the created Volume')
+ self.assertEqual(volume['id'],
+ fetched_volume['id'],
+ 'The fetched Volume is different '
+ 'from the created Volume')
+ self.assertEqual(metadata,
+ fetched_volume['metadata'],
+ 'The fetched Volume is different '
+ 'from the created Volume')
+
+ @attr(type='gate')
+ def test_volume_get_metadata_none(self):
+ # CREATE, GET empty metadata dict
+ v_name = data_utils.rand_name('Volume-')
+ # Create volume
+ resp, volume = self.client.create_volume(size=1,
+ display_name=v_name,
+ metadata={})
+ self.addCleanup(self._delete_volume, volume)
+ self.assertEqual(200, resp.status)
+ self.assertIn('id', volume)
+ self.assertIn('displayName', volume)
+ # Wait for Volume status to become ACTIVE
+ self.client.wait_for_volume_status(volume['id'], 'available')
+ # GET Volume
+ resp, fetched_volume = self.client.get_volume(volume['id'])
+ self.assertEqual(200, resp.status)
+ self.assertEqual(fetched_volume['metadata'], {})
+
+ def _delete_volume(self, volume):
+ # Delete the Volume created in this method
+ try:
+ resp, _ = self.client.delete_volume(volume['id'])
+ self.assertEqual(202, resp.status)
+ # Checking if the deleted Volume still exists
+ self.client.wait_for_resource_deletion(volume['id'])
+ except KeyError:
+ return
+
+
+class VolumesGetTestXML(VolumesGetTestJSON):
+ _interface = "xml"
diff --git a/tempest/api/compute/v3/volumes/test_volumes_list.py b/tempest/api/compute/v3/volumes/test_volumes_list.py
new file mode 100644
index 0000000..b57dcfe
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_volumes_list.py
@@ -0,0 +1,173 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.compute import base
+from tempest.common.utils import data_utils
+from tempest.test import attr
+
+
+class VolumesTestJSON(base.BaseV2ComputeTest):
+
+ """
+ This test creates a number of 1G volumes. To run successfully,
+ ensure that the backing file for the volume group that Nova uses
+ has space for at least 3 1G volumes!
+ If you are running a Devstack environment, ensure that the
+ VOLUME_BACKING_FILE_SIZE is atleast 4G in your localrc
+ """
+
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(VolumesTestJSON, cls).setUpClass()
+ cls.client = cls.volumes_extensions_client
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+ # Create 3 Volumes
+ cls.volume_list = []
+ cls.volume_id_list = []
+ for i in range(3):
+ v_name = data_utils.rand_name('volume-%s' % cls._interface)
+ metadata = {'Type': 'work'}
+ try:
+ resp, volume = cls.client.create_volume(size=1,
+ display_name=v_name,
+ metadata=metadata)
+ cls.client.wait_for_volume_status(volume['id'], 'available')
+ resp, volume = cls.client.get_volume(volume['id'])
+ cls.volume_list.append(volume)
+ cls.volume_id_list.append(volume['id'])
+ except Exception:
+ if cls.volume_list:
+ # We could not create all the volumes, though we were able
+ # to create *some* of the volumes. This is typically
+ # because the backing file size of the volume group is
+ # too small. So, here, we clean up whatever we did manage
+ # to create and raise a SkipTest
+ for volume in cls.volume_list:
+ cls.client.delete_volume(volume)
+ msg = ("Failed to create ALL necessary volumes to run "
+ "test. This typically means that the backing file "
+ "size of the nova-volumes group is too small to "
+ "create the 3 volumes needed by this test case")
+ raise cls.skipException(msg)
+ raise
+
+ @classmethod
+ def tearDownClass(cls):
+ # Delete the created Volumes
+ for volume in cls.volume_list:
+ resp, _ = cls.client.delete_volume(volume['id'])
+ cls.client.wait_for_resource_deletion(volume['id'])
+ super(VolumesTestJSON, cls).tearDownClass()
+
+ @attr(type='gate')
+ def test_volume_list(self):
+ # Should return the list of Volumes
+ # Fetch all Volumes
+ resp, fetched_list = self.client.list_volumes()
+ self.assertEqual(200, resp.status)
+ # Now check if all the Volumes created in setup are in fetched list
+ missing_volumes = [
+ v for v in self.volume_list if v not in fetched_list
+ ]
+
+ self.assertFalse(missing_volumes,
+ "Failed to find volume %s in fetched list" %
+ ', '.join(m_vol['displayName']
+ for m_vol in missing_volumes))
+
+ @attr(type='gate')
+ def test_volume_list_with_details(self):
+ # Should return the list of Volumes with details
+ # Fetch all Volumes
+ resp, fetched_list = self.client.list_volumes_with_detail()
+ self.assertEqual(200, resp.status)
+ # Now check if all the Volumes created in setup are in fetched list
+ missing_volumes = [
+ v for v in self.volume_list if v not in fetched_list
+ ]
+
+ self.assertFalse(missing_volumes,
+ "Failed to find volume %s in fetched list" %
+ ', '.join(m_vol['displayName']
+ for m_vol in missing_volumes))
+
+ @attr(type='gate')
+ def test_volume_list_param_limit(self):
+ # Return the list of volumes based on limit set
+ params = {'limit': 2}
+ resp, fetched_vol_list = self.client.list_volumes(params=params)
+ self.assertEqual(200, resp.status)
+
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volumes by limit set")
+
+ @attr(type='gate')
+ def test_volume_list_with_detail_param_limit(self):
+ # Return the list of volumes with details based on limit set.
+ params = {'limit': 2}
+ resp, fetched_vol_list = \
+ self.client.list_volumes_with_detail(params=params)
+ self.assertEqual(200, resp.status)
+
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volume details by limit set")
+
+ @attr(type='gate')
+ def test_volume_list_param_offset_and_limit(self):
+ # Return the list of volumes based on offset and limit set.
+ # get all volumes list
+ response, all_vol_list = self.client.list_volumes()
+ params = {'offset': 1, 'limit': 1}
+ resp, fetched_vol_list = self.client.list_volumes(params=params)
+ self.assertEqual(200, resp.status)
+
+ # Validating length of the fetched volumes
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volumes by offset and limit")
+ # Validating offset of fetched volume
+ for index, volume in enumerate(fetched_vol_list):
+ self.assertEqual(volume['id'],
+ all_vol_list[index + params['offset']]['id'],
+ "Failed to list volumes by offset and limit")
+
+ @attr(type='gate')
+ def test_volume_list_with_detail_param_offset_and_limit(self):
+ # Return the list of volumes details based on offset and limit set.
+ # get all volumes list
+ response, all_vol_list = self.client.list_volumes_with_detail()
+ params = {'offset': 1, 'limit': 1}
+ resp, fetched_vol_list = \
+ self.client.list_volumes_with_detail(params=params)
+ self.assertEqual(200, resp.status)
+
+ # Validating length of the fetched volumes
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volume details by offset and limit")
+ # Validating offset of fetched volume
+ for index, volume in enumerate(fetched_vol_list):
+ self.assertEqual(volume['id'],
+ all_vol_list[index + params['offset']]['id'],
+ "Failed to list volume details by "
+ "offset and limit")
+
+
+class VolumesTestXML(VolumesTestJSON):
+ _interface = 'xml'
diff --git a/tempest/api/compute/v3/volumes/test_volumes_negative.py b/tempest/api/compute/v3/volumes/test_volumes_negative.py
new file mode 100644
index 0000000..785902e
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_volumes_negative.py
@@ -0,0 +1,104 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from tempest.api.compute import base
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest.test import attr
+
+
+class VolumesNegativeTest(base.BaseV2ComputeTest):
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(VolumesNegativeTest, cls).setUpClass()
+ cls.client = cls.volumes_extensions_client
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ @attr(type=['negative', 'gate'])
+ def test_volume_get_nonexistant_volume_id(self):
+ # Negative: Should not be able to get details of nonexistant volume
+ # Creating a nonexistant volume id
+ # Trying to GET a non existant volume
+ self.assertRaises(exceptions.NotFound, self.client.get_volume,
+ str(uuid.uuid4()))
+
+ @attr(type=['negative', 'gate'])
+ def test_volume_delete_nonexistant_volume_id(self):
+ # Negative: Should not be able to delete nonexistant Volume
+ # Creating nonexistant volume id
+ # Trying to DELETE a non existant volume
+ self.assertRaises(exceptions.NotFound, self.client.delete_volume,
+ str(uuid.uuid4()))
+
+ @attr(type=['negative', 'gate'])
+ def test_create_volume_with_invalid_size(self):
+ # Negative: Should not be able to create volume with invalid size
+ # in request
+ v_name = data_utils.rand_name('Volume-')
+ metadata = {'Type': 'work'}
+ self.assertRaises(exceptions.BadRequest, self.client.create_volume,
+ size='#$%', display_name=v_name, metadata=metadata)
+
+ @attr(type=['negative', 'gate'])
+ def test_create_volume_with_out_passing_size(self):
+ # Negative: Should not be able to create volume without passing size
+ # in request
+ v_name = data_utils.rand_name('Volume-')
+ metadata = {'Type': 'work'}
+ self.assertRaises(exceptions.BadRequest, self.client.create_volume,
+ size='', display_name=v_name, metadata=metadata)
+
+ @attr(type=['negative', 'gate'])
+ def test_create_volume_with_size_zero(self):
+ # Negative: Should not be able to create volume with size zero
+ v_name = data_utils.rand_name('Volume-')
+ metadata = {'Type': 'work'}
+ self.assertRaises(exceptions.BadRequest, self.client.create_volume,
+ size='0', display_name=v_name, metadata=metadata)
+
+ @attr(type=['negative', 'gate'])
+ def test_get_invalid_volume_id(self):
+ # Negative: Should not be able to get volume with invalid id
+ self.assertRaises(exceptions.NotFound,
+ self.client.get_volume, '#$%%&^&^')
+
+ @attr(type=['negative', 'gate'])
+ def test_get_volume_without_passing_volume_id(self):
+ # Negative: Should not be able to get volume when empty ID is passed
+ self.assertRaises(exceptions.NotFound, self.client.get_volume, '')
+
+ @attr(type=['negative', 'gate'])
+ def test_delete_invalid_volume_id(self):
+ # Negative: Should not be able to delete volume when invalid ID is
+ # passed
+ self.assertRaises(exceptions.NotFound,
+ self.client.delete_volume, '!@#$%^&*()')
+
+ @attr(type=['negative', 'gate'])
+ def test_delete_volume_without_passing_volume_id(self):
+ # Negative: Should not be able to delete volume when empty ID is passed
+ self.assertRaises(exceptions.NotFound, self.client.delete_volume, '')
+
+
+class VolumesNegativeTestXML(VolumesNegativeTest):
+ _interface = "xml"