Separate negative tests for Swift

Move negative tests in Swift to their own file.

Partially implements blueprint negative-test-files

Change-Id: If7c42acdbc58107b7fcd6eef979d426935304a76
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
index 6312f69..cacc66e 100644
--- a/tempest/api/object_storage/
+++ b/tempest/api/object_storage/
@@ -17,12 +17,9 @@
 from tempest.api.object_storage import base
 from tempest import clients
 from tempest.common.utils import data_utils
-from tempest import config
 from tempest import exceptions
 from tempest import test
-CONF = config.CONF
 class AccountQuotasTest(base.BaseObjectTest):
@@ -106,15 +103,6 @@
         self.assertEqual(resp["status"], "201")
         self.assertHeaders(resp, 'Object', 'PUT')
-    @test.attr(type=["negative", "smoke"])
-    @test.requires_ext(extension='account_quotas', service='object')
-    def test_upload_large_object(self):
-        object_name = data_utils.rand_name(name="TestObject")
-        data = data_utils.arbitrary_string(30)
-        self.assertRaises(exceptions.OverLimit,
-                          self.object_client.create_object,
-                          self.container_name, object_name, data)
     @test.requires_ext(extension='account_quotas', service='object')
     def test_admin_modify_quota(self):
@@ -138,20 +126,3 @@
             self.assertEqual(resp["status"], "204")
             self.assertHeaders(resp, 'Account', 'POST')
-    @test.attr(type=["negative", "smoke"])
-    @test.requires_ext(extension='account_quotas', service='object')
-    def test_user_modify_quota(self):
-        """Test that a user is not able to modify or remove a quota on
-        its account.
-        """
-        # Not able to remove quota
-        self.assertRaises(exceptions.Unauthorized,
-                          self.account_client.create_account_metadata,
-                          {"Quota-Bytes": ""})
-        # Not able to modify quota
-        self.assertRaises(exceptions.Unauthorized,
-                          self.account_client.create_account_metadata,
-                          {"Quota-Bytes": "100"})
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
new file mode 100644
index 0000000..e35cd17
--- /dev/null
+++ b/tempest/api/object_storage/
@@ -0,0 +1,119 @@
+# Copyright (C) 2013 eNovance SAS <>
+# Author: Joe H. Rahme <>
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from tempest.api.object_storage import base
+from tempest import clients
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest import test
+class AccountQuotasNegativeTest(base.BaseObjectTest):
+    @classmethod
+    def setUpClass(cls):
+        super(AccountQuotasNegativeTest, cls).setUpClass()
+        cls.container_name = data_utils.rand_name(name="TestContainer")
+        cls.container_client.create_container(cls.container_name)
+        cls.os_reselleradmin = clients.Manager(
+  ,
+  ,
+        # Retrieve the ResellerAdmin role id
+        reseller_role_id = None
+        try:
+            _, roles = cls.os_admin.identity_client.list_roles()
+            reseller_role_id = next(r['id'] for r in roles if r['name']
+                                    == 'ResellerAdmin')
+        except StopIteration:
+            msg = "No ResellerAdmin role found"
+            raise exceptions.NotFound(msg)
+        # Retrieve the ResellerAdmin tenant id
+        _, users = cls.os_admin.identity_client.get_users()
+        reseller_user_id = next(usr['id'] for usr in users if usr['name']
+                                ==
+        # Retrieve the ResellerAdmin tenant id
+        _, tenants = cls.os_admin.identity_client.list_tenants()
+        reseller_tenant_id = next(tnt['id'] for tnt in tenants if tnt['name']
+                                  ==
+        # Assign the newly created user the appropriate ResellerAdmin role
+        cls.os_admin.identity_client.assign_user_role(
+            reseller_tenant_id,
+            reseller_user_id,
+            reseller_role_id)
+        # Retrieve a ResellerAdmin auth token and use it to set a quota
+        # on the client's account
+        cls.reselleradmin_token = cls.token_client.get_token(
+  ,
+  ,
+    def setUp(self):
+        super(AccountQuotasNegativeTest, self).setUp()
+        # Set a quota of 20 bytes on the user's account before each test
+        headers = {"X-Auth-Token": self.reselleradmin_token,
+                   "X-Account-Meta-Quota-Bytes": "20"}
+        self.os.custom_account_client.request("POST", "", headers, "")
+    def tearDown(self):
+        # remove the quota from the container
+        headers = {"X-Auth-Token": self.reselleradmin_token,
+                   "X-Remove-Account-Meta-Quota-Bytes": "x"}
+        self.os.custom_account_client.request("POST", "", headers, "")
+        super(AccountQuotasNegativeTest, self).tearDown()
+    @classmethod
+    def tearDownClass(cls):
+        cls.delete_containers([cls.container_name])
+        super(AccountQuotasNegativeTest, cls).tearDownClass()
+    @test.attr(type=["negative", "smoke"])
+    @test.requires_ext(extension='account_quotas', service='object')
+    def test_user_modify_quota(self):
+        """Test that a user is not able to modify or remove a quota on
+        its account.
+        """
+        # Not able to remove quota
+        self.assertRaises(exceptions.Unauthorized,
+                          self.account_client.create_account_metadata,
+                          {"Quota-Bytes": ""})
+        # Not able to modify quota
+        self.assertRaises(exceptions.Unauthorized,
+                          self.account_client.create_account_metadata,
+                          {"Quota-Bytes": "100"})
+    @test.attr(type=["negative", "smoke"])
+    @test.requires_ext(extension='account_quotas', service='object')
+    def test_upload_large_object(self):
+        object_name = data_utils.rand_name(name="TestObject")
+        data = data_utils.arbitrary_string(30)
+        self.assertRaises(exceptions.OverLimit,
+                          self.object_client.create_object,
+                          self.container_name, object_name, data)
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
index 68b9222..0300ced 100644
--- a/tempest/api/object_storage/
+++ b/tempest/api/object_storage/
@@ -17,7 +17,6 @@
 from tempest.api.object_storage import base
 from tempest.common.utils import data_utils
-from tempest import exceptions
 from tempest.test import attr
 from tempest.test import HTTP_SUCCESS
@@ -146,26 +145,3 @@
         resp, _ = self.account_client.list_account_metadata()
         self.assertHeaders(resp, 'Account', 'HEAD')
         self.assertNotIn('x-account-meta-' + header, resp)
-    @attr(type=['negative', 'gate'])
-    def test_list_containers_with_non_authorized_user(self):
-        # list containers using non-authorized user
-        # create user
-        resp, body = \
-            self.token_client.auth(,
-                         ,
-        new_token = \
-            self.token_client.get_token(,
-                              ,
-        custom_headers = {'X-Auth-Token': new_token}
-        params = {'format': 'json'}
-        # list containers with non-authorized user token
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_account_client.list_account_containers,
-                          params=params, metadata=custom_headers)
-        # delete the user which was created
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
new file mode 100644
index 0000000..3b07e17
--- /dev/null
+++ b/tempest/api/object_storage/
@@ -0,0 +1,46 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (C) 2013 eNovance SAS <>
+# Author: Joe H. Rahme <>
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+from tempest.api.object_storage import base
+from tempest import exceptions
+from tempest.test import attr
+class AccountNegativeTest(base.BaseObjectTest):
+    @attr(type=['negative', 'gate'])
+    def test_list_containers_with_non_authorized_user(self):
+        # list containers using non-authorized user
+        # create user
+        self.token_client.auth(,
+                     ,
+        new_token = \
+            self.token_client.get_token(,
+                              ,
+        custom_headers = {'X-Auth-Token': new_token}
+        params = {'format': 'json'}
+        # list containers with non-authorized user token
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_account_client.list_account_containers,
+                          params=params, metadata=custom_headers)
+        # delete the user which was created
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
index a01b703..0733524 100644
--- a/tempest/api/object_storage/
+++ b/tempest/api/object_storage/
@@ -15,7 +15,6 @@
 from tempest.api.object_storage import base
 from tempest.common.utils import data_utils
-from tempest import exceptions
 from tempest.test import attr
 from tempest.test import HTTP_SUCCESS
@@ -44,108 +43,6 @@
         super(ObjectTestACLs, self).tearDown()
-    @attr(type=['negative', 'gate'])
-    def test_write_object_without_using_creds(self):
-        # trying to create object with empty headers
-        # X-Auth-Token is not provided
-        object_name = data_utils.rand_name(name='Object')
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.create_object,
-                          self.container_name, object_name, 'data')
-    @attr(type=['negative', 'gate'])
-    def test_delete_object_without_using_creds(self):
-        # create object
-        object_name = data_utils.rand_name(name='Object')
-        resp, _ = self.object_client.create_object(self.container_name,
-                                                   object_name, 'data')
-        # trying to delete object with empty headers
-        # X-Auth-Token is not provided
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.delete_object,
-                          self.container_name, object_name)
-    @attr(type=['negative', 'gate'])
-    def test_write_object_with_non_authorized_user(self):
-        # attempt to upload another file using non-authorized user
-        # User provided token is forbidden. ACL are not set
-        object_name = data_utils.rand_name(name='Object')
-        # trying to create object with non-authorized user
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.create_object,
-                          self.container_name, object_name, 'data',
-                          metadata=self.custom_headers)
-    @attr(type=['negative', 'gate'])
-    def test_read_object_with_non_authorized_user(self):
-        # attempt to read object using non-authorized user
-        # User provided token is forbidden. ACL are not set
-        object_name = data_utils.rand_name(name='Object')
-        resp, _ = self.object_client.create_object(
-            self.container_name, object_name, 'data')
-        self.assertEqual(resp['status'], '201')
-        self.assertHeaders(resp, 'Object', 'PUT')
-        # trying to get object with non authorized user token
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.get_object,
-                          self.container_name, object_name,
-                          metadata=self.custom_headers)
-    @attr(type=['negative', 'gate'])
-    def test_delete_object_with_non_authorized_user(self):
-        # attempt to delete object using non-authorized user
-        # User provided token is forbidden. ACL are not set
-        object_name = data_utils.rand_name(name='Object')
-        resp, _ = self.object_client.create_object(
-            self.container_name, object_name, 'data')
-        self.assertEqual(resp['status'], '201')
-        self.assertHeaders(resp, 'Object', 'PUT')
-        # trying to delete object with non-authorized user token
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.delete_object,
-                          self.container_name, object_name,
-                          metadata=self.custom_headers)
-    @attr(type=['negative', 'smoke'])
-    def test_read_object_without_rights(self):
-        # attempt to read object using non-authorized user
-        # update X-Container-Read metadata ACL
-        cont_headers = {'X-Container-Read': 'badtenant:baduser'}
-        resp_meta, body = self.container_client.update_container_metadata(
-            self.container_name, metadata=cont_headers,
-            metadata_prefix='')
-        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
-        self.assertHeaders(resp_meta, 'Container', 'POST')
-        # create object
-        object_name = data_utils.rand_name(name='Object')
-        resp, _ = self.object_client.create_object(self.container_name,
-                                                   object_name, 'data')
-        self.assertEqual(resp['status'], '201')
-        self.assertHeaders(resp, 'Object', 'PUT')
-        # Trying to read the object without rights
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.get_object,
-                          self.container_name, object_name,
-                          metadata=self.custom_headers)
-    @attr(type=['negative', 'smoke'])
-    def test_write_object_without_rights(self):
-        # attempt to write object using non-authorized user
-        # update X-Container-Write metadata ACL
-        cont_headers = {'X-Container-Write': 'badtenant:baduser'}
-        resp_meta, body = self.container_client.update_container_metadata(
-            self.container_name, metadata=cont_headers,
-            metadata_prefix='')
-        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
-        self.assertHeaders(resp_meta, 'Container', 'POST')
-        # Trying to write the object without rights
-        object_name = data_utils.rand_name(name='Object')
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.create_object,
-                          self.container_name,
-                          object_name, 'data',
-                          metadata=self.custom_headers)
     def test_read_object_with_rights(self):
         # attempt to read object using authorized user
@@ -189,48 +86,3 @@
         self.assertIn(int(resp['status']), HTTP_SUCCESS)
         self.assertHeaders(resp, 'Object', 'PUT')
-    @attr(type=['negative', 'smoke'])
-    def test_write_object_without_write_rights(self):
-        # attempt to write object using non-authorized user
-        # update X-Container-Read and X-Container-Write metadata ACL
-        cont_headers = {'X-Container-Read':
-               + ':' +,
-                        'X-Container-Write': ''}
-        resp_meta, body = self.container_client.update_container_metadata(
-            self.container_name, metadata=cont_headers,
-            metadata_prefix='')
-        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
-        self.assertHeaders(resp_meta, 'Container', 'POST')
-        # Trying to write the object without write rights
-        object_name = data_utils.rand_name(name='Object')
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.create_object,
-                          self.container_name,
-                          object_name, 'data',
-                          metadata=self.custom_headers)
-    @attr(type=['negative', 'smoke'])
-    def test_delete_object_without_write_rights(self):
-        # attempt to delete object using non-authorized user
-        # update X-Container-Read and X-Container-Write metadata ACL
-        cont_headers = {'X-Container-Read':
-               + ':' +,
-                        'X-Container-Write': ''}
-        resp_meta, body = self.container_client.update_container_metadata(
-            self.container_name, metadata=cont_headers,
-            metadata_prefix='')
-        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
-        self.assertHeaders(resp_meta, 'Container', 'POST')
-        # create object
-        object_name = data_utils.rand_name(name='Object')
-        resp, _ = self.object_client.create_object(self.container_name,
-                                                   object_name, 'data')
-        self.assertEqual(resp['status'], '201')
-        self.assertHeaders(resp, 'Object', 'PUT')
-        # Trying to delete the object without write rights
-        self.assertRaises(exceptions.Unauthorized,
-                          self.custom_object_client.delete_object,
-                          self.container_name,
-                          object_name,
-                          metadata=self.custom_headers)
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
new file mode 100644
index 0000000..e75f21d
--- /dev/null
+++ b/tempest/api/object_storage/
@@ -0,0 +1,195 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (C) 2013 eNovance SAS <>
+# Author: Joe H. Rahme <>
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+from tempest.api.object_storage import base
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest.test import attr
+from tempest.test import HTTP_SUCCESS
+class ObjectACLsNegativeTest(base.BaseObjectTest):
+    @classmethod
+    def setUpClass(cls):
+        super(ObjectACLsNegativeTest, cls).setUpClass()
+        cls.new_token = cls.token_client.get_token(,
+                                         ,
+        cls.custom_headers = {'X-Auth-Token': cls.new_token}
+    @classmethod
+    def tearDownClass(cls):
+        super(ObjectACLsNegativeTest, cls).tearDownClass()
+    def setUp(self):
+        super(ObjectACLsNegativeTest, self).setUp()
+        self.container_name = data_utils.rand_name(name='TestContainer')
+        self.container_client.create_container(self.container_name)
+    def tearDown(self):
+        self.delete_containers([self.container_name])
+        super(ObjectACLsNegativeTest, self).tearDown()
+    @attr(type=['negative', 'gate'])
+    def test_write_object_without_using_creds(self):
+        # trying to create object with empty headers
+        # X-Auth-Token is not provided
+        object_name = data_utils.rand_name(name='Object')
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.create_object,
+                          self.container_name, object_name, 'data')
+    @attr(type=['negative', 'gate'])
+    def test_delete_object_without_using_creds(self):
+        # create object
+        object_name = data_utils.rand_name(name='Object')
+        resp, _ = self.object_client.create_object(self.container_name,
+                                                   object_name, 'data')
+        # trying to delete object with empty headers
+        # X-Auth-Token is not provided
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.delete_object,
+                          self.container_name, object_name)
+    @attr(type=['negative', 'gate'])
+    def test_write_object_with_non_authorized_user(self):
+        # attempt to upload another file using non-authorized user
+        # User provided token is forbidden. ACL are not set
+        object_name = data_utils.rand_name(name='Object')
+        # trying to create object with non-authorized user
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.create_object,
+                          self.container_name, object_name, 'data',
+                          metadata=self.custom_headers)
+    @attr(type=['negative', 'gate'])
+    def test_read_object_with_non_authorized_user(self):
+        # attempt to read object using non-authorized user
+        # User provided token is forbidden. ACL are not set
+        object_name = data_utils.rand_name(name='Object')
+        resp, _ = self.object_client.create_object(
+            self.container_name, object_name, 'data')
+        self.assertEqual(resp['status'], '201')
+        self.assertHeaders(resp, 'Object', 'PUT')
+        # trying to get object with non authorized user token
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.get_object,
+                          self.container_name, object_name,
+                          metadata=self.custom_headers)
+    @attr(type=['negative', 'gate'])
+    def test_delete_object_with_non_authorized_user(self):
+        # attempt to delete object using non-authorized user
+        # User provided token is forbidden. ACL are not set
+        object_name = data_utils.rand_name(name='Object')
+        resp, _ = self.object_client.create_object(
+            self.container_name, object_name, 'data')
+        self.assertEqual(resp['status'], '201')
+        self.assertHeaders(resp, 'Object', 'PUT')
+        # trying to delete object with non-authorized user token
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.delete_object,
+                          self.container_name, object_name,
+                          metadata=self.custom_headers)
+    @attr(type=['negative', 'smoke'])
+    def test_read_object_without_rights(self):
+        # attempt to read object using non-authorized user
+        # update X-Container-Read metadata ACL
+        cont_headers = {'X-Container-Read': 'badtenant:baduser'}
+        resp_meta, body = self.container_client.update_container_metadata(
+            self.container_name, metadata=cont_headers,
+            metadata_prefix='')
+        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+        self.assertHeaders(resp_meta, 'Container', 'POST')
+        # create object
+        object_name = data_utils.rand_name(name='Object')
+        resp, _ = self.object_client.create_object(self.container_name,
+                                                   object_name, 'data')
+        self.assertEqual(resp['status'], '201')
+        self.assertHeaders(resp, 'Object', 'PUT')
+        # Trying to read the object without rights
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.get_object,
+                          self.container_name, object_name,
+                          metadata=self.custom_headers)
+    @attr(type=['negative', 'smoke'])
+    def test_write_object_without_rights(self):
+        # attempt to write object using non-authorized user
+        # update X-Container-Write metadata ACL
+        cont_headers = {'X-Container-Write': 'badtenant:baduser'}
+        resp_meta, body = self.container_client.update_container_metadata(
+            self.container_name, metadata=cont_headers,
+            metadata_prefix='')
+        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+        self.assertHeaders(resp_meta, 'Container', 'POST')
+        # Trying to write the object without rights
+        object_name = data_utils.rand_name(name='Object')
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.create_object,
+                          self.container_name,
+                          object_name, 'data',
+                          metadata=self.custom_headers)
+    @attr(type=['negative', 'smoke'])
+    def test_write_object_without_write_rights(self):
+        # attempt to write object using non-authorized user
+        # update X-Container-Read and X-Container-Write metadata ACL
+        cont_headers = {'X-Container-Read':
+               + ':' +,
+                        'X-Container-Write': ''}
+        resp_meta, body = self.container_client.update_container_metadata(
+            self.container_name, metadata=cont_headers,
+            metadata_prefix='')
+        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+        self.assertHeaders(resp_meta, 'Container', 'POST')
+        # Trying to write the object without write rights
+        object_name = data_utils.rand_name(name='Object')
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.create_object,
+                          self.container_name,
+                          object_name, 'data',
+                          metadata=self.custom_headers)
+    @attr(type=['negative', 'smoke'])
+    def test_delete_object_without_write_rights(self):
+        # attempt to delete object using non-authorized user
+        # update X-Container-Read and X-Container-Write metadata ACL
+        cont_headers = {'X-Container-Read':
+               + ':' +,
+                        'X-Container-Write': ''}
+        resp_meta, body = self.container_client.update_container_metadata(
+            self.container_name, metadata=cont_headers,
+            metadata_prefix='')
+        self.assertIn(int(resp_meta['status']), HTTP_SUCCESS)
+        self.assertHeaders(resp_meta, 'Container', 'POST')
+        # create object
+        object_name = data_utils.rand_name(name='Object')
+        resp, _ = self.object_client.create_object(self.container_name,
+                                                   object_name, 'data')
+        self.assertEqual(resp['status'], '201')
+        self.assertHeaders(resp, 'Object', 'PUT')
+        # Trying to delete the object without write rights
+        self.assertRaises(exceptions.Unauthorized,
+                          self.custom_object_client.delete_object,
+                          self.container_name,
+                          object_name,
+                          metadata=self.custom_headers)
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
index 621a693..6f46ec9 100644
--- a/tempest/api/object_storage/
+++ b/tempest/api/object_storage/
@@ -116,21 +116,3 @@
         self.assertIn(int(resp['status']), HTTP_SUCCESS)
         self.assertHeaders(resp, "Object", "GET")
         self.assertEqual(body, "hello world")
-    @attr(type=['gate', 'negative'])
-    def test_post_object_using_form_expired(self):
-        body, content_type = self.get_multipart_form(expires=1)
-        time.sleep(2)
-        headers = {'Content-Type': content_type,
-                   'Content-Length': str(len(body))}
-        url = "%s/%s/%s" % (self.container_client.base_url,
-                            self.container_name,
-                            self.object_name)
-        # Use a raw request, otherwise authentication headers are used
-        resp, body = self.object_client.http_obj.request(url, "POST",
-                                                         body, headers=headers)
-        self.assertEqual(int(resp['status']), 401)
-        self.assertIn('FormPost: Form Expired', body)
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
new file mode 100644
index 0000000..a07e277
--- /dev/null
+++ b/tempest/api/object_storage/
@@ -0,0 +1,112 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (C) 2013 eNovance SAS <>
+# Author: Joe H. Rahme <>
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import hashlib
+import hmac
+import time
+import urlparse
+from tempest.api.object_storage import base
+from tempest.common.utils import data_utils
+from tempest.test import attr
+class ObjectFormPostNegativeTest(base.BaseObjectTest):
+    @classmethod
+    def setUpClass(cls):
+        super(ObjectFormPostNegativeTest, cls).setUpClass()
+        cls.container_name = data_utils.rand_name(name='TestContainer')
+        cls.object_name = data_utils.rand_name(name='ObjectTemp')
+        cls.container_client.create_container(cls.container_name)
+        cls.containers = [cls.container_name]
+        cls.key = 'Meta'
+        cls.metadata = {'Temp-URL-Key': cls.key}
+        cls.account_client.create_account_metadata(metadata=cls.metadata)
+    @classmethod
+    def tearDownClass(cls):
+        cls.account_client.delete_account_metadata(metadata=cls.metadata)
+        cls.delete_containers(cls.containers)
+        super(ObjectFormPostNegativeTest, cls).tearDownClass()
+    def get_multipart_form(self, expires=600):
+        path = "%s/%s/%s" % (
+            urlparse.urlparse(self.container_client.base_url).path,
+            self.container_name,
+            self.object_name)
+        redirect = ''
+        max_file_size = 104857600
+        max_file_count = 10
+        expires += int(time.time())
+        hmac_body = '%s\n%s\n%s\n%s\n%s' % (path,
+                                            redirect,
+                                            max_file_size,
+                                            max_file_count,
+                                            expires)
+        signature =, hmac_body, hashlib.sha1).hexdigest()
+        fields = {'redirect': redirect,
+                  'max_file_size': str(max_file_size),
+                  'max_file_count': str(max_file_count),
+                  'expires': str(expires),
+                  'signature': signature}
+        boundary = '--boundary--'
+        data = []
+        for (key, value) in fields.items():
+            data.append('--' + boundary)
+            data.append('Content-Disposition: form-data; name="%s"' % key)
+            data.append('')
+            data.append(value)
+        data.append('--' + boundary)
+        data.append('Content-Disposition: form-data; '
+                    'name="file1"; filename="testfile"')
+        data.append('Content-Type: application/octet-stream')
+        data.append('')
+        data.append('hello world')
+        data.append('--' + boundary + '--')
+        data.append('')
+        body = '\r\n'.join(data)
+        content_type = 'multipart/form-data; boundary=%s' % boundary
+        return body, content_type
+    @attr(type=['gate', 'negative'])
+    def test_post_object_using_form_expired(self):
+        body, content_type = self.get_multipart_form(expires=1)
+        time.sleep(2)
+        headers = {'Content-Type': content_type,
+                   'Content-Length': str(len(body))}
+        url = "%s/%s/%s" % (self.container_client.base_url,
+                            self.container_name,
+                            self.object_name)
+        # Use a raw request, otherwise authentication headers are used
+        resp, body = self.object_client.http_obj.request(url, "POST",
+                                                         body, headers=headers)
+        self.assertEqual(int(resp['status']), 401)
+        self.assertIn('FormPost: Form Expired', body)
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
index 0523c68..47c270e 100644
--- a/tempest/api/object_storage/
+++ b/tempest/api/object_storage/
@@ -22,7 +22,6 @@
 from tempest.api.object_storage import base
 from tempest.common.utils import data_utils
 from tempest import config
-from tempest import exceptions
 from tempest import test
 CONF = config.CONF
@@ -186,19 +185,3 @@
         resp, body = self.object_client.head(url)
         self.assertIn(int(resp['status']), test.HTTP_SUCCESS)
         self.assertHeaders(resp, 'Object', 'HEAD')
-    @test.attr(type=['gate', 'negative'])
-    @test.requires_ext(extension='tempurl', service='object')
-    def test_get_object_after_expiration_time(self):
-        expires = self._get_expiry_date(1)
-        # get a temp URL for the created object
-        url = self._get_temp_url(self.container_name,
-                                 self.object_name, "GET",
-                                 expires, self.key)
-        # temp URL is valid for 1 seconds, let's wait 2
-        time.sleep(2)
-        self.assertRaises(exceptions.Unauthorized,
-                          self.object_client.get, url)
diff --git a/tempest/api/object_storage/ b/tempest/api/object_storage/
new file mode 100644
index 0000000..cc507c5
--- /dev/null
+++ b/tempest/api/object_storage/
@@ -0,0 +1,109 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (C) 2013 eNovance SAS <>
+# Author: Joe H. Rahme <>
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import hashlib
+import hmac
+import time
+import urlparse
+from tempest.api.object_storage import base
+from tempest.common.utils import data_utils
+from tempest import exceptions
+from tempest import test
+class ObjectTempUrlNegativeTest(base.BaseObjectTest):
+    @classmethod
+    def setUpClass(cls):
+        super(ObjectTempUrlNegativeTest, cls).setUpClass()
+        cls.container_name = data_utils.rand_name(name='TestContainer')
+        cls.container_client.create_container(cls.container_name)
+        cls.containers = [cls.container_name]
+        # update account metadata
+        cls.key = 'Meta'
+        cls.metadata = {'Temp-URL-Key': cls.key}
+        cls.account_client.create_account_metadata(metadata=cls.metadata)
+        cls.account_client_metadata, _ = \
+            cls.account_client.list_account_metadata()
+    @classmethod
+    def tearDownClass(cls):
+        resp, _ = cls.account_client.delete_account_metadata(
+            metadata=cls.metadata)
+        cls.delete_containers(cls.containers)
+        # delete the user setup created
+        super(ObjectTempUrlNegativeTest, cls).tearDownClass()
+    def setUp(self):
+        super(ObjectTempUrlNegativeTest, self).setUp()
+        # make sure the metadata has been set
+        self.assertIn('x-account-meta-temp-url-key',
+                      self.account_client_metadata)
+        self.assertEqual(
+            self.account_client_metadata['x-account-meta-temp-url-key'],
+            self.key)
+        # create object
+        self.object_name = data_utils.rand_name(name='ObjectTemp')
+ = data_utils.arbitrary_string(size=len(self.object_name),
+                                                base_text=self.object_name)
+        self.object_client.create_object(self.container_name,
+                                         self.object_name,
+    def _get_expiry_date(self, expiration_time=1000):
+        return int(time.time() + expiration_time)
+    def _get_temp_url(self, container, object_name, method, expires,
+                      key):
+        """Create the temporary URL."""
+        path = "%s/%s/%s" % (
+            urlparse.urlparse(self.object_client.base_url).path,
+            container, object_name)
+        hmac_body = '%s\n%s\n%s' % (method, expires, path)
+        sig =, hmac_body, hashlib.sha1).hexdigest()
+        url = "%s/%s?temp_url_sig=%s&temp_url_expires=%s" % (container,
+                                                             object_name,
+                                                             sig, expires)
+        return url
+    @test.attr(type=['gate', 'negative'])
+    @test.requires_ext(extension='tempurl', service='object')
+    def test_get_object_after_expiration_time(self):
+        expires = self._get_expiry_date(1)
+        # get a temp URL for the created object
+        url = self._get_temp_url(self.container_name,
+                                 self.object_name, "GET",
+                                 expires, self.key)
+        # temp URL is valid for 1 seconds, let's wait 2
+        time.sleep(2)
+        self.assertRaises(exceptions.Unauthorized,
+                          self.object_client.get, url)