Separate negative tests for Swift
Move negative tests in Swift to their own file.
Partially implements blueprint negative-test-files
Change-Id: If7c42acdbc58107b7fcd6eef979d426935304a76
diff --git a/tempest/api/object_storage/test_account_quotas.py b/tempest/api/object_storage/test_account_quotas.py
index 6312f69..cacc66e 100644
--- a/tempest/api/object_storage/test_account_quotas.py
+++ b/tempest/api/object_storage/test_account_quotas.py
@@ -17,12 +17,9 @@
from tempest.api.object_storage import base
from tempest import clients
from tempest.common.utils import data_utils
-from tempest import config
from tempest import exceptions
from tempest import test
-CONF = config.CONF
-
class AccountQuotasTest(base.BaseObjectTest):
@@ -106,15 +103,6 @@
self.assertEqual(resp["status"], "201")
self.assertHeaders(resp, 'Object', 'PUT')
- @test.attr(type=["negative", "smoke"])
- @test.requires_ext(extension='account_quotas', service='object')
- def test_upload_large_object(self):
- object_name = data_utils.rand_name(name="TestObject")
- data = data_utils.arbitrary_string(30)
- self.assertRaises(exceptions.OverLimit,
- self.object_client.create_object,
- self.container_name, object_name, data)
-
@test.attr(type=["smoke"])
@test.requires_ext(extension='account_quotas', service='object')
def test_admin_modify_quota(self):
@@ -138,20 +126,3 @@
self.assertEqual(resp["status"], "204")
self.assertHeaders(resp, 'Account', 'POST')
-
- @test.attr(type=["negative", "smoke"])
- @test.requires_ext(extension='account_quotas', service='object')
- def test_user_modify_quota(self):
- """Test that a user is not able to modify or remove a quota on
- its account.
- """
-
- # Not able to remove quota
- self.assertRaises(exceptions.Unauthorized,
- self.account_client.create_account_metadata,
- {"Quota-Bytes": ""})
-
- # Not able to modify quota
- self.assertRaises(exceptions.Unauthorized,
- self.account_client.create_account_metadata,
- {"Quota-Bytes": "100"})
diff --git a/tempest/api/object_storage/test_account_quotas_negative.py b/tempest/api/object_storage/test_account_quotas_negative.py
new file mode 100644
index 0000000..e35cd17
--- /dev/null
+++ b/tempest/api/object_storage/test_account_quotas_negative.py
@@ -0,0 +1,119 @@
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+#
+# Author: Joe H. Rahme <joe.hakim.rahme@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.object_storage import base
+from tempest import clients
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest import test
+
+
+class AccountQuotasNegativeTest(base.BaseObjectTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(AccountQuotasNegativeTest, cls).setUpClass()
+ cls.container_name = data_utils.rand_name(name="TestContainer")
+ cls.container_client.create_container(cls.container_name)
+
+ cls.data.setup_test_user()
+
+ cls.os_reselleradmin = clients.Manager(
+ cls.data.test_user,
+ cls.data.test_password,
+ cls.data.test_tenant)
+
+ # Retrieve the ResellerAdmin role id
+ reseller_role_id = None
+ try:
+ _, roles = cls.os_admin.identity_client.list_roles()
+ reseller_role_id = next(r['id'] for r in roles if r['name']
+ == 'ResellerAdmin')
+ except StopIteration:
+ msg = "No ResellerAdmin role found"
+ raise exceptions.NotFound(msg)
+
+ # Retrieve the ResellerAdmin tenant id
+ _, users = cls.os_admin.identity_client.get_users()
+ reseller_user_id = next(usr['id'] for usr in users if usr['name']
+ == cls.data.test_user)
+
+ # Retrieve the ResellerAdmin tenant id
+ _, tenants = cls.os_admin.identity_client.list_tenants()
+ reseller_tenant_id = next(tnt['id'] for tnt in tenants if tnt['name']
+ == cls.data.test_tenant)
+
+ # Assign the newly created user the appropriate ResellerAdmin role
+ cls.os_admin.identity_client.assign_user_role(
+ reseller_tenant_id,
+ reseller_user_id,
+ reseller_role_id)
+
+ # Retrieve a ResellerAdmin auth token and use it to set a quota
+ # on the client's account
+ cls.reselleradmin_token = cls.token_client.get_token(
+ cls.data.test_user,
+ cls.data.test_password,
+ cls.data.test_tenant)
+
+ def setUp(self):
+ super(AccountQuotasNegativeTest, self).setUp()
+
+ # Set a quota of 20 bytes on the user's account before each test
+ headers = {"X-Auth-Token": self.reselleradmin_token,
+ "X-Account-Meta-Quota-Bytes": "20"}
+
+ self.os.custom_account_client.request("POST", "", headers, "")
+
+ def tearDown(self):
+ # remove the quota from the container
+ headers = {"X-Auth-Token": self.reselleradmin_token,
+ "X-Remove-Account-Meta-Quota-Bytes": "x"}
+
+ self.os.custom_account_client.request("POST", "", headers, "")
+ super(AccountQuotasNegativeTest, self).tearDown()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.delete_containers([cls.container_name])
+ cls.data.teardown_all()
+ super(AccountQuotasNegativeTest, cls).tearDownClass()
+
+ @test.attr(type=["negative", "smoke"])
+ @test.requires_ext(extension='account_quotas', service='object')
+ def test_user_modify_quota(self):
+ """Test that a user is not able to modify or remove a quota on
+ its account.
+ """
+
+ # Not able to remove quota
+ self.assertRaises(exceptions.Unauthorized,
+ self.account_client.create_account_metadata,
+ {"Quota-Bytes": ""})
+
+ # Not able to modify quota
+ self.assertRaises(exceptions.Unauthorized,
+ self.account_client.create_account_metadata,
+ {"Quota-Bytes": "100"})
+
+ @test.attr(type=["negative", "smoke"])
+ @test.requires_ext(extension='account_quotas', service='object')
+ def test_upload_large_object(self):
+ object_name = data_utils.rand_name(name="TestObject")
+ data = data_utils.arbitrary_string(30)
+ self.assertRaises(exceptions.OverLimit,
+ self.object_client.create_object,
+ self.container_name, object_name, data)
diff --git a/tempest/api/object_storage/test_account_services.py b/tempest/api/object_storage/test_account_services.py
index 68b9222..0300ced 100644
--- a/tempest/api/object_storage/test_account_services.py
+++ b/tempest/api/object_storage/test_account_services.py
@@ -17,7 +17,6 @@
from tempest.api.object_storage import base
from tempest.common.utils import data_utils
-from tempest import exceptions
from tempest.test import attr
from tempest.test import HTTP_SUCCESS
@@ -146,26 +145,3 @@
resp, _ = self.account_client.list_account_metadata()
self.assertHeaders(resp, 'Account', 'HEAD')
self.assertNotIn('x-account-meta-' + header, resp)
-
- @attr(type=['negative', 'gate'])
- def test_list_containers_with_non_authorized_user(self):
- # list containers using non-authorized user
-
- # create user
- self.data.setup_test_user()
- resp, body = \
- self.token_client.auth(self.data.test_user,
- self.data.test_password,
- self.data.test_tenant)
- new_token = \
- self.token_client.get_token(self.data.test_user,
- self.data.test_password,
- self.data.test_tenant)
- custom_headers = {'X-Auth-Token': new_token}
- params = {'format': 'json'}
- # list containers with non-authorized user token
- self.assertRaises(exceptions.Unauthorized,
- self.custom_account_client.list_account_containers,
- params=params, metadata=custom_headers)
- # delete the user which was created
- self.data.teardown_all()
diff --git a/tempest/api/object_storage/test_account_services_negative.py b/tempest/api/object_storage/test_account_services_negative.py
new file mode 100644
index 0000000..3b07e17
--- /dev/null
+++ b/tempest/api/object_storage/test_account_services_negative.py
@@ -0,0 +1,46 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+#
+# Author: Joe H. Rahme <joe.hakim.rahme@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.object_storage import base
+from tempest import exceptions
+from tempest.test import attr
+
+
+class AccountNegativeTest(base.BaseObjectTest):
+
+ @attr(type=['negative', 'gate'])
+ def test_list_containers_with_non_authorized_user(self):
+ # list containers using non-authorized user
+
+ # create user
+ self.data.setup_test_user()
+ self.token_client.auth(self.data.test_user,
+ self.data.test_password,
+ self.data.test_tenant)
+ new_token = \
+ self.token_client.get_token(self.data.test_user,
+ self.data.test_password,
+ self.data.test_tenant)
+ custom_headers = {'X-Auth-Token': new_token}
+ params = {'format': 'json'}
+ # list containers with non-authorized user token
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_account_client.list_account_containers,
+ params=params, metadata=custom_headers)
+ # delete the user which was created
+ self.data.teardown_all()
diff --git a/tempest/api/object_storage/test_container_acl.py b/tempest/api/object_storage/test_container_acl.py
index a01b703..0733524 100644
--- a/tempest/api/object_storage/test_container_acl.py
+++ b/tempest/api/object_storage/test_container_acl.py
@@ -15,7 +15,6 @@
from tempest.api.object_storage import base
from tempest.common.utils import data_utils
-from tempest import exceptions
from tempest.test import attr
from tempest.test import HTTP_SUCCESS
@@ -44,108 +43,6 @@
self.delete_containers([self.container_name])
super(ObjectTestACLs, self).tearDown()
- @attr(type=['negative', 'gate'])
- def test_write_object_without_using_creds(self):
- # trying to create object with empty headers
- # X-Auth-Token is not provided
- object_name = data_utils.rand_name(name='Object')
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.create_object,
- self.container_name, object_name, 'data')
-
- @attr(type=['negative', 'gate'])
- def test_delete_object_without_using_creds(self):
- # create object
- object_name = data_utils.rand_name(name='Object')
- resp, _ = self.object_client.create_object(self.container_name,
- object_name, 'data')
- # trying to delete object with empty headers
- # X-Auth-Token is not provided
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.delete_object,
- self.container_name, object_name)
-
- @attr(type=['negative', 'gate'])
- def test_write_object_with_non_authorized_user(self):
- # attempt to upload another file using non-authorized user
- # User provided token is forbidden. ACL are not set
- object_name = data_utils.rand_name(name='Object')
- # trying to create object with non-authorized user
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.create_object,
- self.container_name, object_name, 'data',
- metadata=self.custom_headers)
-
- @attr(type=['negative', 'gate'])
- def test_read_object_with_non_authorized_user(self):
- # attempt to read object using non-authorized user
- # User provided token is forbidden. ACL are not set
- object_name = data_utils.rand_name(name='Object')
- resp, _ = self.object_client.create_object(
- self.container_name, object_name, 'data')
- self.assertEqual(resp['status'], '201')
- self.assertHeaders(resp, 'Object', 'PUT')
- # trying to get object with non authorized user token
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.get_object,
- self.container_name, object_name,
- metadata=self.custom_headers)
-
- @attr(type=['negative', 'gate'])
- def test_delete_object_with_non_authorized_user(self):
- # attempt to delete object using non-authorized user
- # User provided token is forbidden. ACL are not set
- object_name = data_utils.rand_name(name='Object')
- resp, _ = self.object_client.create_object(
- self.container_name, object_name, 'data')
- self.assertEqual(resp['status'], '201')
- self.assertHeaders(resp, 'Object', 'PUT')
- # trying to delete object with non-authorized user token
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.delete_object,
- self.container_name, object_name,
- metadata=self.custom_headers)
-
- @attr(type=['negative', 'smoke'])
- def test_read_object_without_rights(self):
- # attempt to read object using non-authorized user
- # update X-Container-Read metadata ACL
- cont_headers = {'X-Container-Read': 'badtenant:baduser'}
- resp_meta, body = self.container_client.update_container_metadata(
- self.container_name, metadata=cont_headers,
- metadata_prefix='')
- self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
- self.assertHeaders(resp_meta, 'Container', 'POST')
- # create object
- object_name = data_utils.rand_name(name='Object')
- resp, _ = self.object_client.create_object(self.container_name,
- object_name, 'data')
- self.assertEqual(resp['status'], '201')
- self.assertHeaders(resp, 'Object', 'PUT')
- # Trying to read the object without rights
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.get_object,
- self.container_name, object_name,
- metadata=self.custom_headers)
-
- @attr(type=['negative', 'smoke'])
- def test_write_object_without_rights(self):
- # attempt to write object using non-authorized user
- # update X-Container-Write metadata ACL
- cont_headers = {'X-Container-Write': 'badtenant:baduser'}
- resp_meta, body = self.container_client.update_container_metadata(
- self.container_name, metadata=cont_headers,
- metadata_prefix='')
- self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
- self.assertHeaders(resp_meta, 'Container', 'POST')
- # Trying to write the object without rights
- object_name = data_utils.rand_name(name='Object')
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.create_object,
- self.container_name,
- object_name, 'data',
- metadata=self.custom_headers)
-
@attr(type='smoke')
def test_read_object_with_rights(self):
# attempt to read object using authorized user
@@ -189,48 +86,3 @@
metadata=self.custom_headers)
self.assertIn(int(resp['status']), HTTP_SUCCESS)
self.assertHeaders(resp, 'Object', 'PUT')
-
- @attr(type=['negative', 'smoke'])
- def test_write_object_without_write_rights(self):
- # attempt to write object using non-authorized user
- # update X-Container-Read and X-Container-Write metadata ACL
- cont_headers = {'X-Container-Read':
- self.data.test_tenant + ':' + self.data.test_user,
- 'X-Container-Write': ''}
- resp_meta, body = self.container_client.update_container_metadata(
- self.container_name, metadata=cont_headers,
- metadata_prefix='')
- self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
- self.assertHeaders(resp_meta, 'Container', 'POST')
- # Trying to write the object without write rights
- object_name = data_utils.rand_name(name='Object')
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.create_object,
- self.container_name,
- object_name, 'data',
- metadata=self.custom_headers)
-
- @attr(type=['negative', 'smoke'])
- def test_delete_object_without_write_rights(self):
- # attempt to delete object using non-authorized user
- # update X-Container-Read and X-Container-Write metadata ACL
- cont_headers = {'X-Container-Read':
- self.data.test_tenant + ':' + self.data.test_user,
- 'X-Container-Write': ''}
- resp_meta, body = self.container_client.update_container_metadata(
- self.container_name, metadata=cont_headers,
- metadata_prefix='')
- self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
- self.assertHeaders(resp_meta, 'Container', 'POST')
- # create object
- object_name = data_utils.rand_name(name='Object')
- resp, _ = self.object_client.create_object(self.container_name,
- object_name, 'data')
- self.assertEqual(resp['status'], '201')
- self.assertHeaders(resp, 'Object', 'PUT')
- # Trying to delete the object without write rights
- self.assertRaises(exceptions.Unauthorized,
- self.custom_object_client.delete_object,
- self.container_name,
- object_name,
- metadata=self.custom_headers)
diff --git a/tempest/api/object_storage/test_container_acl_negative.py b/tempest/api/object_storage/test_container_acl_negative.py
new file mode 100644
index 0000000..e75f21d
--- /dev/null
+++ b/tempest/api/object_storage/test_container_acl_negative.py
@@ -0,0 +1,195 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+#
+# Author: Joe H. Rahme <joe.hakim.rahme@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.object_storage import base
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest.test import attr
+from tempest.test import HTTP_SUCCESS
+
+
+class ObjectACLsNegativeTest(base.BaseObjectTest):
+ @classmethod
+ def setUpClass(cls):
+ super(ObjectACLsNegativeTest, cls).setUpClass()
+ cls.data.setup_test_user()
+ cls.new_token = cls.token_client.get_token(cls.data.test_user,
+ cls.data.test_password,
+ cls.data.test_tenant)
+ cls.custom_headers = {'X-Auth-Token': cls.new_token}
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.data.teardown_all()
+ super(ObjectACLsNegativeTest, cls).tearDownClass()
+
+ def setUp(self):
+ super(ObjectACLsNegativeTest, self).setUp()
+ self.container_name = data_utils.rand_name(name='TestContainer')
+ self.container_client.create_container(self.container_name)
+
+ def tearDown(self):
+ self.delete_containers([self.container_name])
+ super(ObjectACLsNegativeTest, self).tearDown()
+
+ @attr(type=['negative', 'gate'])
+ def test_write_object_without_using_creds(self):
+ # trying to create object with empty headers
+ # X-Auth-Token is not provided
+ object_name = data_utils.rand_name(name='Object')
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.create_object,
+ self.container_name, object_name, 'data')
+
+ @attr(type=['negative', 'gate'])
+ def test_delete_object_without_using_creds(self):
+ # create object
+ object_name = data_utils.rand_name(name='Object')
+ resp, _ = self.object_client.create_object(self.container_name,
+ object_name, 'data')
+ # trying to delete object with empty headers
+ # X-Auth-Token is not provided
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.delete_object,
+ self.container_name, object_name)
+
+ @attr(type=['negative', 'gate'])
+ def test_write_object_with_non_authorized_user(self):
+ # attempt to upload another file using non-authorized user
+ # User provided token is forbidden. ACL are not set
+ object_name = data_utils.rand_name(name='Object')
+ # trying to create object with non-authorized user
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.create_object,
+ self.container_name, object_name, 'data',
+ metadata=self.custom_headers)
+
+ @attr(type=['negative', 'gate'])
+ def test_read_object_with_non_authorized_user(self):
+ # attempt to read object using non-authorized user
+ # User provided token is forbidden. ACL are not set
+ object_name = data_utils.rand_name(name='Object')
+ resp, _ = self.object_client.create_object(
+ self.container_name, object_name, 'data')
+ self.assertEqual(resp['status'], '201')
+ self.assertHeaders(resp, 'Object', 'PUT')
+ # trying to get object with non authorized user token
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.get_object,
+ self.container_name, object_name,
+ metadata=self.custom_headers)
+
+ @attr(type=['negative', 'gate'])
+ def test_delete_object_with_non_authorized_user(self):
+ # attempt to delete object using non-authorized user
+ # User provided token is forbidden. ACL are not set
+ object_name = data_utils.rand_name(name='Object')
+ resp, _ = self.object_client.create_object(
+ self.container_name, object_name, 'data')
+ self.assertEqual(resp['status'], '201')
+ self.assertHeaders(resp, 'Object', 'PUT')
+ # trying to delete object with non-authorized user token
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.delete_object,
+ self.container_name, object_name,
+ metadata=self.custom_headers)
+
+ @attr(type=['negative', 'smoke'])
+ def test_read_object_without_rights(self):
+ # attempt to read object using non-authorized user
+ # update X-Container-Read metadata ACL
+ cont_headers = {'X-Container-Read': 'badtenant:baduser'}
+ resp_meta, body = self.container_client.update_container_metadata(
+ self.container_name, metadata=cont_headers,
+ metadata_prefix='')
+ self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+ self.assertHeaders(resp_meta, 'Container', 'POST')
+ # create object
+ object_name = data_utils.rand_name(name='Object')
+ resp, _ = self.object_client.create_object(self.container_name,
+ object_name, 'data')
+ self.assertEqual(resp['status'], '201')
+ self.assertHeaders(resp, 'Object', 'PUT')
+ # Trying to read the object without rights
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.get_object,
+ self.container_name, object_name,
+ metadata=self.custom_headers)
+
+ @attr(type=['negative', 'smoke'])
+ def test_write_object_without_rights(self):
+ # attempt to write object using non-authorized user
+ # update X-Container-Write metadata ACL
+ cont_headers = {'X-Container-Write': 'badtenant:baduser'}
+ resp_meta, body = self.container_client.update_container_metadata(
+ self.container_name, metadata=cont_headers,
+ metadata_prefix='')
+ self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+ self.assertHeaders(resp_meta, 'Container', 'POST')
+ # Trying to write the object without rights
+ object_name = data_utils.rand_name(name='Object')
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.create_object,
+ self.container_name,
+ object_name, 'data',
+ metadata=self.custom_headers)
+
+ @attr(type=['negative', 'smoke'])
+ def test_write_object_without_write_rights(self):
+ # attempt to write object using non-authorized user
+ # update X-Container-Read and X-Container-Write metadata ACL
+ cont_headers = {'X-Container-Read':
+ self.data.test_tenant + ':' + self.data.test_user,
+ 'X-Container-Write': ''}
+ resp_meta, body = self.container_client.update_container_metadata(
+ self.container_name, metadata=cont_headers,
+ metadata_prefix='')
+ self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+ self.assertHeaders(resp_meta, 'Container', 'POST')
+ # Trying to write the object without write rights
+ object_name = data_utils.rand_name(name='Object')
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.create_object,
+ self.container_name,
+ object_name, 'data',
+ metadata=self.custom_headers)
+
+ @attr(type=['negative', 'smoke'])
+ def test_delete_object_without_write_rights(self):
+ # attempt to delete object using non-authorized user
+ # update X-Container-Read and X-Container-Write metadata ACL
+ cont_headers = {'X-Container-Read':
+ self.data.test_tenant + ':' + self.data.test_user,
+ 'X-Container-Write': ''}
+ resp_meta, body = self.container_client.update_container_metadata(
+ self.container_name, metadata=cont_headers,
+ metadata_prefix='')
+ self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+ self.assertHeaders(resp_meta, 'Container', 'POST')
+ # create object
+ object_name = data_utils.rand_name(name='Object')
+ resp, _ = self.object_client.create_object(self.container_name,
+ object_name, 'data')
+ self.assertEqual(resp['status'], '201')
+ self.assertHeaders(resp, 'Object', 'PUT')
+ # Trying to delete the object without write rights
+ self.assertRaises(exceptions.Unauthorized,
+ self.custom_object_client.delete_object,
+ self.container_name,
+ object_name,
+ metadata=self.custom_headers)
diff --git a/tempest/api/object_storage/test_object_formpost.py b/tempest/api/object_storage/test_object_formpost.py
index 621a693..6f46ec9 100644
--- a/tempest/api/object_storage/test_object_formpost.py
+++ b/tempest/api/object_storage/test_object_formpost.py
@@ -116,21 +116,3 @@
self.assertIn(int(resp['status']), HTTP_SUCCESS)
self.assertHeaders(resp, "Object", "GET")
self.assertEqual(body, "hello world")
-
- @attr(type=['gate', 'negative'])
- def test_post_object_using_form_expired(self):
- body, content_type = self.get_multipart_form(expires=1)
- time.sleep(2)
-
- headers = {'Content-Type': content_type,
- 'Content-Length': str(len(body))}
-
- url = "%s/%s/%s" % (self.container_client.base_url,
- self.container_name,
- self.object_name)
-
- # Use a raw request, otherwise authentication headers are used
- resp, body = self.object_client.http_obj.request(url, "POST",
- body, headers=headers)
- self.assertEqual(int(resp['status']), 401)
- self.assertIn('FormPost: Form Expired', body)
diff --git a/tempest/api/object_storage/test_object_formpost_negative.py b/tempest/api/object_storage/test_object_formpost_negative.py
new file mode 100644
index 0000000..a07e277
--- /dev/null
+++ b/tempest/api/object_storage/test_object_formpost_negative.py
@@ -0,0 +1,112 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+# Author: Joe H. Rahme <joe.hakim.rahme@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import hashlib
+import hmac
+import time
+import urlparse
+
+from tempest.api.object_storage import base
+from tempest.common.utils import data_utils
+from tempest.test import attr
+
+
+class ObjectFormPostNegativeTest(base.BaseObjectTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(ObjectFormPostNegativeTest, cls).setUpClass()
+ cls.container_name = data_utils.rand_name(name='TestContainer')
+ cls.object_name = data_utils.rand_name(name='ObjectTemp')
+
+ cls.container_client.create_container(cls.container_name)
+ cls.containers = [cls.container_name]
+
+ cls.key = 'Meta'
+ cls.metadata = {'Temp-URL-Key': cls.key}
+ cls.account_client.create_account_metadata(metadata=cls.metadata)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.account_client.delete_account_metadata(metadata=cls.metadata)
+ cls.delete_containers(cls.containers)
+ cls.data.teardown_all()
+ super(ObjectFormPostNegativeTest, cls).tearDownClass()
+
+ def get_multipart_form(self, expires=600):
+ path = "%s/%s/%s" % (
+ urlparse.urlparse(self.container_client.base_url).path,
+ self.container_name,
+ self.object_name)
+
+ redirect = ''
+ max_file_size = 104857600
+ max_file_count = 10
+ expires += int(time.time())
+ hmac_body = '%s\n%s\n%s\n%s\n%s' % (path,
+ redirect,
+ max_file_size,
+ max_file_count,
+ expires)
+
+ signature = hmac.new(self.key, hmac_body, hashlib.sha1).hexdigest()
+
+ fields = {'redirect': redirect,
+ 'max_file_size': str(max_file_size),
+ 'max_file_count': str(max_file_count),
+ 'expires': str(expires),
+ 'signature': signature}
+
+ boundary = '--boundary--'
+ data = []
+ for (key, value) in fields.items():
+ data.append('--' + boundary)
+ data.append('Content-Disposition: form-data; name="%s"' % key)
+ data.append('')
+ data.append(value)
+
+ data.append('--' + boundary)
+ data.append('Content-Disposition: form-data; '
+ 'name="file1"; filename="testfile"')
+ data.append('Content-Type: application/octet-stream')
+ data.append('')
+ data.append('hello world')
+
+ data.append('--' + boundary + '--')
+ data.append('')
+
+ body = '\r\n'.join(data)
+ content_type = 'multipart/form-data; boundary=%s' % boundary
+ return body, content_type
+
+ @attr(type=['gate', 'negative'])
+ def test_post_object_using_form_expired(self):
+ body, content_type = self.get_multipart_form(expires=1)
+ time.sleep(2)
+
+ headers = {'Content-Type': content_type,
+ 'Content-Length': str(len(body))}
+
+ url = "%s/%s/%s" % (self.container_client.base_url,
+ self.container_name,
+ self.object_name)
+
+ # Use a raw request, otherwise authentication headers are used
+ resp, body = self.object_client.http_obj.request(url, "POST",
+ body, headers=headers)
+ self.assertEqual(int(resp['status']), 401)
+ self.assertIn('FormPost: Form Expired', body)
diff --git a/tempest/api/object_storage/test_object_temp_url.py b/tempest/api/object_storage/test_object_temp_url.py
index 0523c68..47c270e 100644
--- a/tempest/api/object_storage/test_object_temp_url.py
+++ b/tempest/api/object_storage/test_object_temp_url.py
@@ -22,7 +22,6 @@
from tempest.api.object_storage import base
from tempest.common.utils import data_utils
from tempest import config
-from tempest import exceptions
from tempest import test
CONF = config.CONF
@@ -186,19 +185,3 @@
resp, body = self.object_client.head(url)
self.assertIn(int(resp['status']), test.HTTP_SUCCESS)
self.assertHeaders(resp, 'Object', 'HEAD')
-
- @test.attr(type=['gate', 'negative'])
- @test.requires_ext(extension='tempurl', service='object')
- def test_get_object_after_expiration_time(self):
-
- expires = self._get_expiry_date(1)
- # get a temp URL for the created object
- url = self._get_temp_url(self.container_name,
- self.object_name, "GET",
- expires, self.key)
-
- # temp URL is valid for 1 seconds, let's wait 2
- time.sleep(2)
-
- self.assertRaises(exceptions.Unauthorized,
- self.object_client.get, url)
diff --git a/tempest/api/object_storage/test_object_temp_url_negative.py b/tempest/api/object_storage/test_object_temp_url_negative.py
new file mode 100644
index 0000000..cc507c5
--- /dev/null
+++ b/tempest/api/object_storage/test_object_temp_url_negative.py
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
+#
+# Author: Joe H. Rahme <joe.hakim.rahme@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import hashlib
+import hmac
+import time
+import urlparse
+
+from tempest.api.object_storage import base
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest import test
+
+
+class ObjectTempUrlNegativeTest(base.BaseObjectTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(ObjectTempUrlNegativeTest, cls).setUpClass()
+
+ cls.container_name = data_utils.rand_name(name='TestContainer')
+ cls.container_client.create_container(cls.container_name)
+ cls.containers = [cls.container_name]
+
+ # update account metadata
+ cls.key = 'Meta'
+ cls.metadata = {'Temp-URL-Key': cls.key}
+ cls.account_client.create_account_metadata(metadata=cls.metadata)
+ cls.account_client_metadata, _ = \
+ cls.account_client.list_account_metadata()
+
+ @classmethod
+ def tearDownClass(cls):
+ resp, _ = cls.account_client.delete_account_metadata(
+ metadata=cls.metadata)
+
+ cls.delete_containers(cls.containers)
+
+ # delete the user setup created
+ cls.data.teardown_all()
+ super(ObjectTempUrlNegativeTest, cls).tearDownClass()
+
+ def setUp(self):
+ super(ObjectTempUrlNegativeTest, self).setUp()
+ # make sure the metadata has been set
+ self.assertIn('x-account-meta-temp-url-key',
+ self.account_client_metadata)
+
+ self.assertEqual(
+ self.account_client_metadata['x-account-meta-temp-url-key'],
+ self.key)
+
+ # create object
+ self.object_name = data_utils.rand_name(name='ObjectTemp')
+ self.data = data_utils.arbitrary_string(size=len(self.object_name),
+ base_text=self.object_name)
+ self.object_client.create_object(self.container_name,
+ self.object_name, self.data)
+
+ def _get_expiry_date(self, expiration_time=1000):
+ return int(time.time() + expiration_time)
+
+ def _get_temp_url(self, container, object_name, method, expires,
+ key):
+ """Create the temporary URL."""
+
+ path = "%s/%s/%s" % (
+ urlparse.urlparse(self.object_client.base_url).path,
+ container, object_name)
+
+ hmac_body = '%s\n%s\n%s' % (method, expires, path)
+ sig = hmac.new(key, hmac_body, hashlib.sha1).hexdigest()
+
+ url = "%s/%s?temp_url_sig=%s&temp_url_expires=%s" % (container,
+ object_name,
+ sig, expires)
+
+ return url
+
+ @test.attr(type=['gate', 'negative'])
+ @test.requires_ext(extension='tempurl', service='object')
+ def test_get_object_after_expiration_time(self):
+
+ expires = self._get_expiry_date(1)
+ # get a temp URL for the created object
+ url = self._get_temp_url(self.container_name,
+ self.object_name, "GET",
+ expires, self.key)
+
+ # temp URL is valid for 1 seconds, let's wait 2
+ time.sleep(2)
+
+ self.assertRaises(exceptions.Unauthorized,
+ self.object_client.get, url)