Merge "port related volumes tests into nova v3 part1"
diff --git a/tempest/api/compute/v3/volumes/__init__.py b/tempest/api/compute/v3/volumes/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/__init__.py
diff --git a/tempest/api/compute/v3/volumes/test_attach_volume.py b/tempest/api/compute/v3/volumes/test_attach_volume.py
new file mode 100644
index 0000000..660de95
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_attach_volume.py
@@ -0,0 +1,116 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 IBM Corp.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import testtools
+
+from tempest.api.compute import base
+from tempest.common.utils.linux.remote_client import RemoteClient
+import tempest.config
+from tempest.test import attr
+
+
+class AttachVolumeTestJSON(base.BaseV2ComputeTest):
+ _interface = 'json'
+ run_ssh = tempest.config.TempestConfig().compute.run_ssh
+
+ def __init__(self, *args, **kwargs):
+ super(AttachVolumeTestJSON, self).__init__(*args, **kwargs)
+ self.server = None
+ self.volume = None
+ self.attached = False
+
+ @classmethod
+ def setUpClass(cls):
+ super(AttachVolumeTestJSON, cls).setUpClass()
+ cls.device = cls.config.compute.volume_device_name
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ def _detach(self, server_id, volume_id):
+ if self.attached:
+ self.servers_client.detach_volume(server_id, volume_id)
+ self.volumes_client.wait_for_volume_status(volume_id, 'available')
+
+ def _delete_volume(self):
+ if self.volume:
+ self.volumes_client.delete_volume(self.volume['id'])
+ self.volume = None
+
+ def _create_and_attach(self):
+ # Start a server and wait for it to become ready
+ admin_pass = self.image_ssh_password
+ resp, server = self.create_test_server(wait_until='ACTIVE',
+ adminPass=admin_pass)
+ self.server = server
+
+ # Record addresses so that we can ssh later
+ resp, server['addresses'] = \
+ self.servers_client.list_addresses(server['id'])
+
+ # Create a volume and wait for it to become ready
+ resp, volume = self.volumes_client.create_volume(1,
+ display_name='test')
+ self.volume = volume
+ self.addCleanup(self._delete_volume)
+ self.volumes_client.wait_for_volume_status(volume['id'], 'available')
+
+ # Attach the volume to the server
+ self.servers_client.attach_volume(server['id'], volume['id'],
+ device='/dev/%s' % self.device)
+ self.volumes_client.wait_for_volume_status(volume['id'], 'in-use')
+
+ self.attached = True
+ self.addCleanup(self._detach, server['id'], volume['id'])
+
+ @testtools.skipIf(not run_ssh, 'SSH required for this test')
+ @attr(type='gate')
+ def test_attach_detach_volume(self):
+ # Stop and Start a server with an attached volume, ensuring that
+ # the volume remains attached.
+ self._create_and_attach()
+ server = self.server
+ volume = self.volume
+
+ self.servers_client.stop(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'SHUTOFF')
+
+ self.servers_client.start(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
+
+ linux_client = RemoteClient(server,
+ self.image_ssh_user, server['adminPass'])
+ partitions = linux_client.get_partitions()
+ self.assertIn(self.device, partitions)
+
+ self._detach(server['id'], volume['id'])
+ self.attached = False
+
+ self.servers_client.stop(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'SHUTOFF')
+
+ self.servers_client.start(server['id'])
+ self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
+
+ linux_client = RemoteClient(server,
+ self.image_ssh_user, server['adminPass'])
+ partitions = linux_client.get_partitions()
+ self.assertNotIn(self.device, partitions)
+
+
+class AttachVolumeTestXML(AttachVolumeTestJSON):
+ _interface = 'xml'
diff --git a/tempest/api/compute/v3/volumes/test_volumes_get.py b/tempest/api/compute/v3/volumes/test_volumes_get.py
new file mode 100644
index 0000000..ae6996d
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_volumes_get.py
@@ -0,0 +1,104 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.compute import base
+from tempest.common.utils import data_utils
+from tempest.test import attr
+
+
+class VolumesGetTestJSON(base.BaseV2ComputeTest):
+
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(VolumesGetTestJSON, cls).setUpClass()
+ cls.client = cls.volumes_extensions_client
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ @attr(type='smoke')
+ def test_volume_create_get_delete(self):
+ # CREATE, GET, DELETE Volume
+ volume = None
+ v_name = data_utils.rand_name('Volume-%s-') % self._interface
+ metadata = {'Type': 'work'}
+ # Create volume
+ resp, volume = self.client.create_volume(size=1,
+ display_name=v_name,
+ metadata=metadata)
+ self.addCleanup(self._delete_volume, volume)
+ self.assertEqual(200, resp.status)
+ self.assertIn('id', volume)
+ self.assertIn('displayName', volume)
+ self.assertEqual(volume['displayName'], v_name,
+ "The created volume name is not equal "
+ "to the requested name")
+ self.assertTrue(volume['id'] is not None,
+ "Field volume id is empty or not found.")
+ # Wait for Volume status to become ACTIVE
+ self.client.wait_for_volume_status(volume['id'], 'available')
+ # GET Volume
+ resp, fetched_volume = self.client.get_volume(volume['id'])
+ self.assertEqual(200, resp.status)
+ # Verfication of details of fetched Volume
+ self.assertEqual(v_name,
+ fetched_volume['displayName'],
+ 'The fetched Volume is different '
+ 'from the created Volume')
+ self.assertEqual(volume['id'],
+ fetched_volume['id'],
+ 'The fetched Volume is different '
+ 'from the created Volume')
+ self.assertEqual(metadata,
+ fetched_volume['metadata'],
+ 'The fetched Volume is different '
+ 'from the created Volume')
+
+ @attr(type='gate')
+ def test_volume_get_metadata_none(self):
+ # CREATE, GET empty metadata dict
+ v_name = data_utils.rand_name('Volume-')
+ # Create volume
+ resp, volume = self.client.create_volume(size=1,
+ display_name=v_name,
+ metadata={})
+ self.addCleanup(self._delete_volume, volume)
+ self.assertEqual(200, resp.status)
+ self.assertIn('id', volume)
+ self.assertIn('displayName', volume)
+ # Wait for Volume status to become ACTIVE
+ self.client.wait_for_volume_status(volume['id'], 'available')
+ # GET Volume
+ resp, fetched_volume = self.client.get_volume(volume['id'])
+ self.assertEqual(200, resp.status)
+ self.assertEqual(fetched_volume['metadata'], {})
+
+ def _delete_volume(self, volume):
+ # Delete the Volume created in this method
+ try:
+ resp, _ = self.client.delete_volume(volume['id'])
+ self.assertEqual(202, resp.status)
+ # Checking if the deleted Volume still exists
+ self.client.wait_for_resource_deletion(volume['id'])
+ except KeyError:
+ return
+
+
+class VolumesGetTestXML(VolumesGetTestJSON):
+ _interface = "xml"
diff --git a/tempest/api/compute/v3/volumes/test_volumes_list.py b/tempest/api/compute/v3/volumes/test_volumes_list.py
new file mode 100644
index 0000000..b57dcfe
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_volumes_list.py
@@ -0,0 +1,173 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.compute import base
+from tempest.common.utils import data_utils
+from tempest.test import attr
+
+
+class VolumesTestJSON(base.BaseV2ComputeTest):
+
+ """
+ This test creates a number of 1G volumes. To run successfully,
+ ensure that the backing file for the volume group that Nova uses
+ has space for at least 3 1G volumes!
+ If you are running a Devstack environment, ensure that the
+ VOLUME_BACKING_FILE_SIZE is atleast 4G in your localrc
+ """
+
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(VolumesTestJSON, cls).setUpClass()
+ cls.client = cls.volumes_extensions_client
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+ # Create 3 Volumes
+ cls.volume_list = []
+ cls.volume_id_list = []
+ for i in range(3):
+ v_name = data_utils.rand_name('volume-%s' % cls._interface)
+ metadata = {'Type': 'work'}
+ try:
+ resp, volume = cls.client.create_volume(size=1,
+ display_name=v_name,
+ metadata=metadata)
+ cls.client.wait_for_volume_status(volume['id'], 'available')
+ resp, volume = cls.client.get_volume(volume['id'])
+ cls.volume_list.append(volume)
+ cls.volume_id_list.append(volume['id'])
+ except Exception:
+ if cls.volume_list:
+ # We could not create all the volumes, though we were able
+ # to create *some* of the volumes. This is typically
+ # because the backing file size of the volume group is
+ # too small. So, here, we clean up whatever we did manage
+ # to create and raise a SkipTest
+ for volume in cls.volume_list:
+ cls.client.delete_volume(volume)
+ msg = ("Failed to create ALL necessary volumes to run "
+ "test. This typically means that the backing file "
+ "size of the nova-volumes group is too small to "
+ "create the 3 volumes needed by this test case")
+ raise cls.skipException(msg)
+ raise
+
+ @classmethod
+ def tearDownClass(cls):
+ # Delete the created Volumes
+ for volume in cls.volume_list:
+ resp, _ = cls.client.delete_volume(volume['id'])
+ cls.client.wait_for_resource_deletion(volume['id'])
+ super(VolumesTestJSON, cls).tearDownClass()
+
+ @attr(type='gate')
+ def test_volume_list(self):
+ # Should return the list of Volumes
+ # Fetch all Volumes
+ resp, fetched_list = self.client.list_volumes()
+ self.assertEqual(200, resp.status)
+ # Now check if all the Volumes created in setup are in fetched list
+ missing_volumes = [
+ v for v in self.volume_list if v not in fetched_list
+ ]
+
+ self.assertFalse(missing_volumes,
+ "Failed to find volume %s in fetched list" %
+ ', '.join(m_vol['displayName']
+ for m_vol in missing_volumes))
+
+ @attr(type='gate')
+ def test_volume_list_with_details(self):
+ # Should return the list of Volumes with details
+ # Fetch all Volumes
+ resp, fetched_list = self.client.list_volumes_with_detail()
+ self.assertEqual(200, resp.status)
+ # Now check if all the Volumes created in setup are in fetched list
+ missing_volumes = [
+ v for v in self.volume_list if v not in fetched_list
+ ]
+
+ self.assertFalse(missing_volumes,
+ "Failed to find volume %s in fetched list" %
+ ', '.join(m_vol['displayName']
+ for m_vol in missing_volumes))
+
+ @attr(type='gate')
+ def test_volume_list_param_limit(self):
+ # Return the list of volumes based on limit set
+ params = {'limit': 2}
+ resp, fetched_vol_list = self.client.list_volumes(params=params)
+ self.assertEqual(200, resp.status)
+
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volumes by limit set")
+
+ @attr(type='gate')
+ def test_volume_list_with_detail_param_limit(self):
+ # Return the list of volumes with details based on limit set.
+ params = {'limit': 2}
+ resp, fetched_vol_list = \
+ self.client.list_volumes_with_detail(params=params)
+ self.assertEqual(200, resp.status)
+
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volume details by limit set")
+
+ @attr(type='gate')
+ def test_volume_list_param_offset_and_limit(self):
+ # Return the list of volumes based on offset and limit set.
+ # get all volumes list
+ response, all_vol_list = self.client.list_volumes()
+ params = {'offset': 1, 'limit': 1}
+ resp, fetched_vol_list = self.client.list_volumes(params=params)
+ self.assertEqual(200, resp.status)
+
+ # Validating length of the fetched volumes
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volumes by offset and limit")
+ # Validating offset of fetched volume
+ for index, volume in enumerate(fetched_vol_list):
+ self.assertEqual(volume['id'],
+ all_vol_list[index + params['offset']]['id'],
+ "Failed to list volumes by offset and limit")
+
+ @attr(type='gate')
+ def test_volume_list_with_detail_param_offset_and_limit(self):
+ # Return the list of volumes details based on offset and limit set.
+ # get all volumes list
+ response, all_vol_list = self.client.list_volumes_with_detail()
+ params = {'offset': 1, 'limit': 1}
+ resp, fetched_vol_list = \
+ self.client.list_volumes_with_detail(params=params)
+ self.assertEqual(200, resp.status)
+
+ # Validating length of the fetched volumes
+ self.assertEqual(len(fetched_vol_list), params['limit'],
+ "Failed to list volume details by offset and limit")
+ # Validating offset of fetched volume
+ for index, volume in enumerate(fetched_vol_list):
+ self.assertEqual(volume['id'],
+ all_vol_list[index + params['offset']]['id'],
+ "Failed to list volume details by "
+ "offset and limit")
+
+
+class VolumesTestXML(VolumesTestJSON):
+ _interface = 'xml'
diff --git a/tempest/api/compute/v3/volumes/test_volumes_negative.py b/tempest/api/compute/v3/volumes/test_volumes_negative.py
new file mode 100644
index 0000000..785902e
--- /dev/null
+++ b/tempest/api/compute/v3/volumes/test_volumes_negative.py
@@ -0,0 +1,104 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from tempest.api.compute import base
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest.test import attr
+
+
+class VolumesNegativeTest(base.BaseV2ComputeTest):
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(VolumesNegativeTest, cls).setUpClass()
+ cls.client = cls.volumes_extensions_client
+ if not cls.config.service_available.cinder:
+ skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ @attr(type=['negative', 'gate'])
+ def test_volume_get_nonexistant_volume_id(self):
+ # Negative: Should not be able to get details of nonexistant volume
+ # Creating a nonexistant volume id
+ # Trying to GET a non existant volume
+ self.assertRaises(exceptions.NotFound, self.client.get_volume,
+ str(uuid.uuid4()))
+
+ @attr(type=['negative', 'gate'])
+ def test_volume_delete_nonexistant_volume_id(self):
+ # Negative: Should not be able to delete nonexistant Volume
+ # Creating nonexistant volume id
+ # Trying to DELETE a non existant volume
+ self.assertRaises(exceptions.NotFound, self.client.delete_volume,
+ str(uuid.uuid4()))
+
+ @attr(type=['negative', 'gate'])
+ def test_create_volume_with_invalid_size(self):
+ # Negative: Should not be able to create volume with invalid size
+ # in request
+ v_name = data_utils.rand_name('Volume-')
+ metadata = {'Type': 'work'}
+ self.assertRaises(exceptions.BadRequest, self.client.create_volume,
+ size='#$%', display_name=v_name, metadata=metadata)
+
+ @attr(type=['negative', 'gate'])
+ def test_create_volume_with_out_passing_size(self):
+ # Negative: Should not be able to create volume without passing size
+ # in request
+ v_name = data_utils.rand_name('Volume-')
+ metadata = {'Type': 'work'}
+ self.assertRaises(exceptions.BadRequest, self.client.create_volume,
+ size='', display_name=v_name, metadata=metadata)
+
+ @attr(type=['negative', 'gate'])
+ def test_create_volume_with_size_zero(self):
+ # Negative: Should not be able to create volume with size zero
+ v_name = data_utils.rand_name('Volume-')
+ metadata = {'Type': 'work'}
+ self.assertRaises(exceptions.BadRequest, self.client.create_volume,
+ size='0', display_name=v_name, metadata=metadata)
+
+ @attr(type=['negative', 'gate'])
+ def test_get_invalid_volume_id(self):
+ # Negative: Should not be able to get volume with invalid id
+ self.assertRaises(exceptions.NotFound,
+ self.client.get_volume, '#$%%&^&^')
+
+ @attr(type=['negative', 'gate'])
+ def test_get_volume_without_passing_volume_id(self):
+ # Negative: Should not be able to get volume when empty ID is passed
+ self.assertRaises(exceptions.NotFound, self.client.get_volume, '')
+
+ @attr(type=['negative', 'gate'])
+ def test_delete_invalid_volume_id(self):
+ # Negative: Should not be able to delete volume when invalid ID is
+ # passed
+ self.assertRaises(exceptions.NotFound,
+ self.client.delete_volume, '!@#$%^&*()')
+
+ @attr(type=['negative', 'gate'])
+ def test_delete_volume_without_passing_volume_id(self):
+ # Negative: Should not be able to delete volume when empty ID is passed
+ self.assertRaises(exceptions.NotFound, self.client.delete_volume, '')
+
+
+class VolumesNegativeTestXML(VolumesNegativeTest):
+ _interface = "xml"