blob: 123c61000cdce1fd4669ff8370d2d524cab7f203 [file] [log] [blame]
Justin Shepherd0d9bbd12011-08-11 12:57:44 -05001# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
3# Copyright 2011 OpenStack, LLC
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18"""Functional test case for OpenStack Swift """
19
20import hashlib
21import httplib2
22import json
23import os
24import tempfile
25import time
26import unittest
27import urllib
28
29from pprint import pprint
30
31import tests
32
33SMALL_OBJ = "include/swift_objects/swift_small"
34MED_OBJ = "include/swift_objects/swift_medium"
35LRG_OBJ = "include/swift_objects/swift_large"
36
37
38class TestSwift(tests.FunctionalTest):
39 def test_000_auth(self):
40 if self.swift['auth_ssl'] == "False":
41 prot = "http://"
42 else:
43 prot = "https://"
44
45 path = "%s%s:%s%s%s" % (prot, self.swift['auth_host'],
46 self.swift['auth_port'],
47 self.swift['auth_prefix'],
48 self.swift['ver'])
49
50 # Uncomment for debugging
51 # pprint(path)
52
53 http = httplib2.Http(disable_ssl_certificate_validation=True)
54 self.swift['auth_user'] = '%s:%s' % (self.swift['account'],
55 self.swift['username'])
56 headers = {'X-Auth-User': '%s' % (self.swift['auth_user']),
57 'X-Auth-Key': '%s' % (self.swift['password'])}
58 response, content = http.request(path, 'GET', headers=headers)
59 self.assertEqual(200, response.status)
60 self.assertIsNotNone(response['x-auth-token'])
61 self.assertIsNotNone(response['x-storage-token'])
62 self.assertIsNotNone(response['x-storage-url'])
63
64 for k, v in response.items():
65 if (k == 'x-auth-token'):
66 self.swift['x-auth-token'] = v
67 if (k == 'x-storage-token'):
68 self.swift['x-storage-token'] = v
69
70 # Since we don't have DNS this is a bit of a hack, but works
71 url = response['x-storage-url'].split('/')
72 self.swift['storage_url'] = "%s//%s:%s/%s/%s" % (url[0],
73 self.swift['auth_host'],
74 self.swift['auth_port'],
75 url[3],
76 url[4])
77 test_000_auth.tags = ['swift']
78
79 def test_001_create_container(self):
80 path = "%s/%s/" % (self.swift['storage_url'], "test_container")
81 http = httplib2.Http(disable_ssl_certificate_validation=True)
82 headers = { 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
83 response, content = http.request(path, 'PUT', headers=headers)
84 self.assertEqual(201, response.status)
85 test_001_create_container.tags = ['swift']
86
87 def test_002_list_containers(self):
88 http = httplib2.Http(disable_ssl_certificate_validation=True)
89 headers = {'X-Auth-Token': '%s' % (self.swift['x-auth-token'])}
90 response, content = http.request(self.swift['storage_url'], 'GET',
91 headers=headers)
92 self.assertEqual(200, response.status)
93 self.assertLessEqual('1', response['x-account-container-count'])
94 test_002_list_containers.tags = ['swift']
95
96 def test_010_create_small_object(self):
97 md5 = self._md5sum_file(SMALL_OBJ)
98 path = "%s/%s/%s" % (self.swift['storage_url'],
99 "test_container",
100 "swift_small")
101 http = httplib2.Http(disable_ssl_certificate_validation=True)
102 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
103 self.swift['username']),
104 'X-Storage-Token': '%s' % (self.swift['x-storage-token']),
105 'ETag': '%s' % (md5),
106 'Content-Length': '%d' % os.path.getsize(SMALL_OBJ),
107 'Content-Type': 'application/octet-stream'}
108 upload = open(SMALL_OBJ, "rb")
109 response, content = http.request(path, 'PUT',
110 headers=headers,
111 body=upload)
112 self.assertEqual(201, response.status)
113 self.assertIn('201', content)
114 test_010_create_small_object.tags = ['swift']
115
116 def test_011_create_medium_object(self):
117 md5 = self._md5sum_file(MED_OBJ)
118 path = "%s/%s/%s" % (self.swift['storage_url'],
119 "test_container",
120 "swift_medium")
121 http = httplib2.Http(disable_ssl_certificate_validation=True)
122 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
123 self.swift['username']),
124 'X-Storage-Token': '%s' % (self.swift['x-storage-token']),
125 'ETag': '%s' % (md5),
126 'Content-Length': '%d' % (os.path.getsize(MED_OBJ)),
127 'Content-Type': 'application/octet-stream',
128 'Content-Encoding': 'gzip'}
129 upload = ""
130 for chunk in self._read_in_chunks(MED_OBJ):
131 upload += chunk
132 response, content = http.request(path, 'PUT',
133 headers=headers,
134 body=upload)
135 self.assertEqual(201, response.status)
136 test_011_create_medium_object.tags = ['swift']
137
138 def test_013_get_small_object(self):
139 path = "%s/%s/%s" % (self.swift['storage_url'],
140 "test_container",
141 "swift_small")
142 http = httplib2.Http(disable_ssl_certificate_validation=True)
143 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
144 self.swift['username']),
145 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
146 response, content = http.request(path, 'GET',
147 headers=headers)
148 self.assertEqual(200, response.status)
149 self.assertEqual(self._md5sum_file(SMALL_OBJ), response['etag'])
150 test_013_get_small_object.tags = ['swift']
151
152 def test_017_delete_small_object(self):
153 path = "%s/%s/%s" % (self.swift['storage_url'], "test_container",
154 "swift_small")
155 http = httplib2.Http(disable_ssl_certificate_validation=True)
156 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
157 self.swift['username']),
158 'X-Storage-Token': '%s' % (
159 self.swift['x-storage-token'])}
160 response, content = http.request(path, 'DELETE', headers=headers)
161 self.assertEqual(204, response.status)
162 test_017_delete_small_object.tags = ['swift']
163
164 def test_018_delete_medium_object(self):
165 path = "%s/%s/%s" % (self.swift['storage_url'], "test_container",
166 "swift_medium")
167 http = httplib2.Http(disable_ssl_certificate_validation=True)
168 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
169 self.swift['username']),
170 'X-Storage-Token': '%s' % (
171 self.swift['x-storage-token'])}
172 response, content = http.request(path, 'DELETE', headers=headers)
173 self.assertEqual(204, response.status)
174 test_018_delete_medium_object.tags = ['swift']
175
176 def test_030_check_container_metadata(self):
177 path = "%s/%s" % (self.swift['storage_url'], "test_container")
178 http = httplib2.Http(disable_ssl_certificate_validation=True)
179 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
180 self.swift['username']),
181 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
182 response, content = http.request(path, 'HEAD', headers=headers)
183 self.assertEqual(204, response.status)
184 # pprint(response)
185 test_030_check_container_metadata.tags = ['swift']
186
187 def test_050_delete_container(self):
188 path = "%s/%s" % (self.swift['storage_url'], "test_container")
189 http = httplib2.Http(disable_ssl_certificate_validation=True)
190 headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
191 self.swift['username']),
192 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
193 response, content = http.request(path, 'DELETE', headers=headers)
194 self.assertEqual(204, response.status)
195 test_050_delete_container.tags = ['swift']