Initial Release
diff --git a/tests/990_test_skip_examples.py b/tests/990_test_skip_examples.py
new file mode 100644
index 0000000..d0c44da
--- /dev/null
+++ b/tests/990_test_skip_examples.py
@@ -0,0 +1,55 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Functional test case to check the status of gepetto and
+set information of hosts etc..
+"""
+
+import os
+import tests
+import unittest2
+
+
+class TestSkipExamples(tests.FunctionalTest):
+ @tests.skip_test("testing skipping")
+ def test_absolute_skip(self):
+ x = 1
+
+ @tests.skip_unless(os.getenv("BLAH"),
+ "Skipping -- Environment variable BLAH does not exist")
+ def test_skip_unless_env_blah_exists(self):
+ x = 1
+
+ @tests.skip_unless(os.getenv("USER"),
+ "Not Skipping -- Environment variable USER does not exist")
+ def test_skip_unless_env_user_exists(self):
+ x = 1
+
+ @tests.skip_if(os.getenv("USER"),
+ "Skiping -- Environment variable USER exists")
+ def test_skip_if_env_user_exists(self):
+ x = 1
+
+ @tests.skip_if(os.getenv("BLAH"),
+ "Not Skipping -- Environment variable BLAH exists")
+ def test_skip_if_env_blah_exists(self):
+ x = 1
+
+ def test_tags_example(self):
+ pass
+ test_tags_example.tags = ['kvm', 'olympus']
diff --git a/tests/994_test_rabbitmq.py b/tests/994_test_rabbitmq.py
new file mode 100644
index 0000000..f0e6fe4
--- /dev/null
+++ b/tests/994_test_rabbitmq.py
@@ -0,0 +1,100 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Functional test case to check RabbitMQ """
+import pika
+import tests
+
+from pprint import pprint
+#RABBITMQ_HOST = get_config("rabbitmq/host")
+#RABBITMQ_USERNAME = get_config("rabbitmq/user")
+#RABBITMQ_PASSWORD = get_config("rabbitmq/password")
+
+
+class TestRabbitMQ(tests.FunctionalTest):
+ def test_000_ghetto(self):
+ """
+ This sets the host, user, and pass self variables so they
+ are accessible by all other methods
+ """
+ self.rabbitmq['host'] = self.config['rabbitmq']['host']
+ self.rabbitmq['user'] = self.config['rabbitmq']['user']
+ self.rabbitmq['pass'] = self.config['rabbitmq']['password']
+ test_000_ghetto.tags = ['rabbitmq']
+
+ def _cnx(self):
+ # TODO: Figuring out what's going with creds
+ # creds = pika.credentials.PlainCredentials(
+ # self.rabbitmq['user'], self.rabbitmq['pass']
+ connection = pika.BlockingConnection(pika.ConnectionParameters(
+ host=self.rabbitmq['host']))
+ channel = connection.channel()
+ return (channel, connection)
+
+ def test_001_connect(self):
+ channel, connection = self._cnx()
+ self.assert_(channel)
+ connection.close()
+ test_001_connect.tags = ['rabbitmq']
+
+ def test_002_send_receive_msg(self):
+ unitmsg = 'Hello from unittest'
+ channel, connection = self._cnx()
+ channel.queue_declare(queue='u1')
+ channel.basic_publish(exchange='',
+ routing_key='u1',
+ body=unitmsg)
+ connection.close()
+
+ channel, connection = self._cnx()
+
+ def callback(ch, method, properties, body):
+ self.assertEquals(body, unitmsg)
+ ch.stop_consuming()
+
+ channel.basic_consume(callback,
+ queue='u1',
+ no_ack=True)
+ channel.start_consuming()
+ test_002_send_receive_msg.tags = ['rabbitmq']
+
+ def test_003_send_receive_msg_with_persistense(self):
+ unitmsg = 'Hello from unittest with Persistense'
+ channel, connection = self._cnx()
+ channel.queue_declare(queue='u2', durable=True)
+ prop = pika.BasicProperties(delivery_mode=2)
+ channel.basic_publish(exchange='',
+ routing_key='u2',
+ body=unitmsg,
+ properties=prop,
+ )
+ connection.close()
+
+ channel, connection = self._cnx()
+ channel.queue_declare(queue='u2', durable=True)
+
+ def callback(ch, method, properties, body):
+ self.assertEquals(body, unitmsg)
+ ch.basic_ack(delivery_tag=method.delivery_tag)
+ ch.stop_consuming()
+
+ channel.basic_qos(prefetch_count=1)
+ channel.basic_consume(callback,
+ queue='u2')
+
+ channel.start_consuming()
+ test_003_send_receive_msg_with_persistense.tags = ['rabbitmq']
diff --git a/tests/995_test_swift.py b/tests/995_test_swift.py
new file mode 100644
index 0000000..123c610
--- /dev/null
+++ b/tests/995_test_swift.py
@@ -0,0 +1,195 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Functional test case for OpenStack Swift """
+
+import hashlib
+import httplib2
+import json
+import os
+import tempfile
+import time
+import unittest
+import urllib
+
+from pprint import pprint
+
+import tests
+
+SMALL_OBJ = "include/swift_objects/swift_small"
+MED_OBJ = "include/swift_objects/swift_medium"
+LRG_OBJ = "include/swift_objects/swift_large"
+
+
+class TestSwift(tests.FunctionalTest):
+ def test_000_auth(self):
+ if self.swift['auth_ssl'] == "False":
+ prot = "http://"
+ else:
+ prot = "https://"
+
+ path = "%s%s:%s%s%s" % (prot, self.swift['auth_host'],
+ self.swift['auth_port'],
+ self.swift['auth_prefix'],
+ self.swift['ver'])
+
+ # Uncomment for debugging
+ # pprint(path)
+
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ self.swift['auth_user'] = '%s:%s' % (self.swift['account'],
+ self.swift['username'])
+ headers = {'X-Auth-User': '%s' % (self.swift['auth_user']),
+ 'X-Auth-Key': '%s' % (self.swift['password'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ self.assertIsNotNone(response['x-auth-token'])
+ self.assertIsNotNone(response['x-storage-token'])
+ self.assertIsNotNone(response['x-storage-url'])
+
+ for k, v in response.items():
+ if (k == 'x-auth-token'):
+ self.swift['x-auth-token'] = v
+ if (k == 'x-storage-token'):
+ self.swift['x-storage-token'] = v
+
+ # Since we don't have DNS this is a bit of a hack, but works
+ url = response['x-storage-url'].split('/')
+ self.swift['storage_url'] = "%s//%s:%s/%s/%s" % (url[0],
+ self.swift['auth_host'],
+ self.swift['auth_port'],
+ url[3],
+ url[4])
+ test_000_auth.tags = ['swift']
+
+ def test_001_create_container(self):
+ path = "%s/%s/" % (self.swift['storage_url'], "test_container")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = { 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
+ response, content = http.request(path, 'PUT', headers=headers)
+ self.assertEqual(201, response.status)
+ test_001_create_container.tags = ['swift']
+
+ def test_002_list_containers(self):
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-Token': '%s' % (self.swift['x-auth-token'])}
+ response, content = http.request(self.swift['storage_url'], 'GET',
+ headers=headers)
+ self.assertEqual(200, response.status)
+ self.assertLessEqual('1', response['x-account-container-count'])
+ test_002_list_containers.tags = ['swift']
+
+ def test_010_create_small_object(self):
+ md5 = self._md5sum_file(SMALL_OBJ)
+ path = "%s/%s/%s" % (self.swift['storage_url'],
+ "test_container",
+ "swift_small")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (self.swift['x-storage-token']),
+ 'ETag': '%s' % (md5),
+ 'Content-Length': '%d' % os.path.getsize(SMALL_OBJ),
+ 'Content-Type': 'application/octet-stream'}
+ upload = open(SMALL_OBJ, "rb")
+ response, content = http.request(path, 'PUT',
+ headers=headers,
+ body=upload)
+ self.assertEqual(201, response.status)
+ self.assertIn('201', content)
+ test_010_create_small_object.tags = ['swift']
+
+ def test_011_create_medium_object(self):
+ md5 = self._md5sum_file(MED_OBJ)
+ path = "%s/%s/%s" % (self.swift['storage_url'],
+ "test_container",
+ "swift_medium")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (self.swift['x-storage-token']),
+ 'ETag': '%s' % (md5),
+ 'Content-Length': '%d' % (os.path.getsize(MED_OBJ)),
+ 'Content-Type': 'application/octet-stream',
+ 'Content-Encoding': 'gzip'}
+ upload = ""
+ for chunk in self._read_in_chunks(MED_OBJ):
+ upload += chunk
+ response, content = http.request(path, 'PUT',
+ headers=headers,
+ body=upload)
+ self.assertEqual(201, response.status)
+ test_011_create_medium_object.tags = ['swift']
+
+ def test_013_get_small_object(self):
+ path = "%s/%s/%s" % (self.swift['storage_url'],
+ "test_container",
+ "swift_small")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
+ response, content = http.request(path, 'GET',
+ headers=headers)
+ self.assertEqual(200, response.status)
+ self.assertEqual(self._md5sum_file(SMALL_OBJ), response['etag'])
+ test_013_get_small_object.tags = ['swift']
+
+ def test_017_delete_small_object(self):
+ path = "%s/%s/%s" % (self.swift['storage_url'], "test_container",
+ "swift_small")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (
+ self.swift['x-storage-token'])}
+ response, content = http.request(path, 'DELETE', headers=headers)
+ self.assertEqual(204, response.status)
+ test_017_delete_small_object.tags = ['swift']
+
+ def test_018_delete_medium_object(self):
+ path = "%s/%s/%s" % (self.swift['storage_url'], "test_container",
+ "swift_medium")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (
+ self.swift['x-storage-token'])}
+ response, content = http.request(path, 'DELETE', headers=headers)
+ self.assertEqual(204, response.status)
+ test_018_delete_medium_object.tags = ['swift']
+
+ def test_030_check_container_metadata(self):
+ path = "%s/%s" % (self.swift['storage_url'], "test_container")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
+ response, content = http.request(path, 'HEAD', headers=headers)
+ self.assertEqual(204, response.status)
+ # pprint(response)
+ test_030_check_container_metadata.tags = ['swift']
+
+ def test_050_delete_container(self):
+ path = "%s/%s" % (self.swift['storage_url'], "test_container")
+ http = httplib2.Http(disable_ssl_certificate_validation=True)
+ headers = {'X-Auth-User': '%s:%s' % (self.swift['account'],
+ self.swift['username']),
+ 'X-Storage-Token': '%s' % (self.swift['x-storage-token'])}
+ response, content = http.request(path, 'DELETE', headers=headers)
+ self.assertEqual(204, response.status)
+ test_050_delete_container.tags = ['swift']
diff --git a/tests/996_test_glance.py b/tests/996_test_glance.py
new file mode 100644
index 0000000..333d147
--- /dev/null
+++ b/tests/996_test_glance.py
@@ -0,0 +1,193 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Validate a working Glance deployment"""
+
+import httplib2
+import json
+import os
+from pprint import pprint
+
+import tests
+
+
+class TestGlanceAPI(tests.FunctionalTest):
+ def test_001_connect_to_glance_api(self):
+ """
+ Verifies ability to connect to glance api,
+ expects glance to return an empty set
+ """
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'])
+ else:
+ path = "http://%s:%s/images" % (self.glance['host'],
+ self.glance['port'])
+ http = httplib2.Http()
+ response, content = http.request(path, 'GET')
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+ self.assertTrue('images' in data)
+ test_001_connect_to_glance_api.tags = ['glance']
+
+ def test_002_upload_kernel_to_glance(self):
+ """
+ Uploads a test kernal to glance api
+ """
+ kernel = "sample_vm/vmlinuz-2.6.32-23-server"
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'])
+ else:
+ path = "http://%s:%s/images" % (self.glance['host'],
+ self.glance['port'])
+ headers = {'x-image-meta-is-public': 'true',
+ 'x-image-meta-name': 'test-kernel',
+ 'x-image-meta-disk-format': 'aki',
+ 'x-image-meta-container-format': 'aki',
+ 'Content-Length': '%d' % os.path.getsize(kernel),
+ 'Content-Type': 'application/octet-stream'}
+ image_file = open(kernel, "rb")
+ http = httplib2.Http()
+ response, content = http.request(path, 'POST',
+ headers=headers,
+ body=image_file)
+ image_file.close()
+ self.assertEqual(201, response.status)
+ data = json.loads(content)
+ self.glance['kernel_id'] = data['image']['id']
+ self.assertEqual(data['image']['name'], "test-kernel")
+ self.assertEqual(data['image']['checksum'], self._md5sum_file(kernel))
+ test_002_upload_kernel_to_glance.tags = ['glance', 'nova']
+
+ def test_003_upload_initrd_to_glance(self):
+ """
+ Uploads a test initrd to glance api
+ """
+ initrd = "sample_vm/initrd.img-2.6.32-23-server"
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'])
+ else:
+ path = "http://%s:%s/images" % (self.glance['host'],
+ self.glance['port'])
+ headers = {'x-image-meta-is-public': 'true',
+ 'x-image-meta-name': 'test-ramdisk',
+ 'x-image-meta-disk-format': 'ari',
+ 'x-image-meta-container-format': 'ari',
+ 'Content-Length': '%d' % os.path.getsize(initrd),
+ 'Content-Type': 'application/octet-stream'}
+ image_file = open(initrd, "rb")
+ http = httplib2.Http()
+ response, content = http.request(path,
+ 'POST',
+ headers=headers,
+ body=image_file)
+ image_file.close()
+ self.assertEqual(201, response.status)
+ data = json.loads(content)
+ self.glance['ramdisk_id'] = data['image']['id']
+ self.assertEqual(data['image']['name'], "test-ramdisk")
+ self.assertEqual(data['image']['checksum'], self._md5sum_file(initrd))
+ test_003_upload_initrd_to_glance.tags = ['glance', 'nova']
+
+ def test_004_upload_image_to_glance(self):
+ """
+ Uploads a test image to glance api, and
+ links it to the initrd and kernel uploaded
+ earlier
+ """
+ image = "sample_vm/ubuntu-lucid.img"
+ upload_data = ""
+ for chunk in self._read_in_chunks(image):
+ upload_data += chunk
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'])
+ else:
+ path = "http://%s:%s/images" % (self.glance['host'],
+ self.glance['port'])
+ headers = {'x-image-meta-is-public': 'true',
+ 'x-image-meta-name': 'test-image',
+ 'x-image-meta-disk-format': 'ami',
+ 'x-image-meta-container-format': 'ami',
+ 'x-image-meta-property-Kernel_id': '%s' % \
+ self.glance['kernel_id'],
+ 'x-image-meta-property-Ramdisk_id': '%s' % \
+ self.glance['ramdisk_id'],
+ 'Content-Length': '%d' % os.path.getsize(image),
+ 'Content-Type': 'application/octet-stream'}
+ http = httplib2.Http()
+ response, content = http.request(path, 'POST',
+ headers=headers,
+ body=upload_data)
+ self.assertEqual(201, response.status)
+ data = json.loads(content)
+ self.glance['image_id'] = data['image']['id']
+ self.assertEqual(data['image']['name'], "test-image")
+ self.assertEqual(data['image']['checksum'], self._md5sum_file(image))
+ test_004_upload_image_to_glance.tags = ['glance', 'nova']
+
+ def test_005_set_image_meta_property(self):
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'],
+ self.glance['image_id'])
+ else:
+ path = "http://%s:%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['image_id'])
+ headers = {'X-Image-Meta-Property-Distro': 'Ubuntu',
+ 'X-Image-Meta-Property-Arch': 'x86_64',
+ 'X-Image-Meta-Property-Kernel_id': '%s' % \
+ self.glance['kernel_id'],
+ 'X-Image-Meta-Property-Ramdisk_id': '%s' % \
+ self.glance['ramdisk_id']}
+ http = httplib2.Http()
+ response, content = http.request(path, 'PUT', headers=headers)
+ self.assertEqual(response.status, 200)
+ data = json.loads(content)
+ self.assertEqual(data['image']['properties']['arch'], "x86_64")
+ self.assertEqual(data['image']['properties']['distro'], "Ubuntu")
+ self.assertEqual(data['image']['properties']['kernel_id'],
+ str(self.glance['kernel_id']))
+ self.assertEqual(data['image']['properties']['ramdisk_id'],
+ str(self.glance['ramdisk_id']))
+ test_005_set_image_meta_property.tags = ['glance']
+
+ def test_006_list_image_metadata(self):
+ image = "sample_vm/ubuntu-lucid.img"
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'],
+ self.glance['image_id'])
+ else:
+ path = "http://%s:%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['image_id'])
+ http = httplib2.Http()
+ response, content = http.request(path, 'HEAD')
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response['x-image-meta-name'], "test-image")
+ self.assertEqual(response['x-image-meta-checksum'],
+ self._md5sum_file(image))
+ self.assertEqual(response['x-image-meta-container_format'], "ami")
+ self.assertEqual(response['x-image-meta-disk_format'], "ami")
+ self.assertEqual(response['x-image-meta-property-arch'], "x86_64")
+ self.assertEqual(response['x-image-meta-property-distro'], "Ubuntu")
+ self.assertEqual(response['x-image-meta-property-kernel_id'],
+ str(self.glance['kernel_id']))
+ self.assertEqual(response['x-image-meta-property-ramdisk_id'],
+ str(self.glance['ramdisk_id']))
+ test_006_list_image_metadata.tags = ['glance']
diff --git a/tests/998_test_nova.py b/tests/998_test_nova.py
new file mode 100644
index 0000000..36d071d
--- /dev/null
+++ b/tests/998_test_nova.py
@@ -0,0 +1,387 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Functional test case against the OpenStack Nova API server"""
+
+import json
+import os
+import tempfile
+import unittest
+import httplib2
+import urllib
+import hashlib
+import time
+import os
+
+from pprint import pprint
+
+import tests
+
+
+class TestNovaAPI(tests.FunctionalTest):
+ def build_check(self, id):
+ self.result = {}
+ """
+ This is intended to check that a server completes the build process
+ and enters an active state upon creation. Due to reporting errors in
+ the API we are also testing ping and ssh
+ """
+ count = 0
+ path = "http://%s:%s/%s/servers/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ id)
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+
+ # Get Server status exit when active
+ while (data['server']['status'] != 'ACTIVE'):
+ response, content = http.request(path, 'GET', headers=headers)
+ data = json.loads(content)
+ time.sleep(5)
+ count = count + 5
+ self.result['serverid'] = id
+ self.result['status'] = data['server']['status']
+
+ # Get IP Address of newly created server
+ if 'addr' in data['server']['addresses']['vmnet'][0]:
+ netaddr = data['server']['addresses']['vmnet'][0]['addr']
+ elif 'addr' in data['server']['address']['public'][0]:
+ netaddr = data['server']['addresses']['public'][0]['addr']
+
+ r = "" . join(os.popen('ping -c5 %s' % (netaddr)).readlines())
+ if r.find('64 bytes') > 1:
+ self.result['ping'] = True
+ else:
+ self.result['ping'] = False
+
+ return self.result
+
+ def test_002_verify_nova_auth(self):
+ if 'keystone' in self.config:
+ path = "http://%s:%s/%s" % (self.keystone['host'],
+ self.keystone['port'],
+ self.keystone['apiver'])
+ headers = {'X-Auth-User': self.keystone['user'],
+ 'X-Auth-Key': self.keystone['pass']}
+ else:
+ path = "http://%s:%s/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ headers = {'X-Auth-User': self.nova['user'],
+ 'X-Auth-Key': self.nova['key']}
+
+ http = httplib2.Http()
+ response, content = http.request(path, 'HEAD', headers=headers)
+ self.assertEqual(204, response.status)
+ self.assertNotEqual(response['x-auth-token'], '')
+ self.assertNotEqual(response['x-server-management-url'], '')
+
+ # Set up Auth Token for all future API interactions
+ self.nova['X-Auth-Token'] = response['x-auth-token']
+ test_002_verify_nova_auth.tags = ['nova', 'nova-api']
+
+ def test_101_verify_version_selection_default(self):
+ path = "http://%s:%s/" % (self.nova['host'],
+ self.nova['port'])
+ http = httplib2.Http()
+ headers = {'X-Auth-Token': self.nova['X-Auth-Token']}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+ self.assertEqual(len(data['versions']), 2)
+ test_101_verify_version_selection_default.tags = ['nova', 'nova-api']
+
+ def test_102_verify_version_selection_json(self):
+ path = "http://%s:%s/.json" % (self.nova['host'],
+ self.nova['port'])
+ http = httplib2.Http()
+ headers = {'X-Auth-Token': self.nova['X-Auth-Token']}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+ self.assertEqual(len(data['versions']), 2)
+ test_102_verify_version_selection_json.tags = ['nova', 'nova-api']
+
+ def test_103_verify_version_selection_xml(self):
+ path = "http://%s:%s/.xml" % (self.nova['host'],
+ self.nova['port'])
+ http = httplib2.Http()
+ headers = {'X-Auth-Token': self.nova['X-Auth-Token']}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ self.assertTrue('<versions>' in content)
+ test_103_verify_version_selection_xml.tags = ['nova', 'nova-api']
+
+ def test_104_bad_user_bad_key(self):
+ if 'keystone' in self.config:
+ path = "http://%s:%s/%s" % (self.keystone['host'],
+ self.keystone['port'],
+ self.keystone['apiver'])
+ else:
+ path = "http://%s:%s/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': 'unknown_auth_user',
+ 'X-Auth-Key': 'unknown_auth_key'}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(response.status, 401)
+ test_104_bad_user_bad_key.tags = ['nova', 'nova-api']
+
+ def test_105_bad_user_good_key(self):
+ if 'keystone' in self.config:
+ path = "http://%s:%s/%s" % (self.keystone['host'],
+ self.keystone['port'],
+ self.keystone['apiver'])
+ else:
+ path = "http://%s:%s/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': 'unknown_auth_user',
+ 'X-Auth-Key': self.nova['key']}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(response.status, 401)
+ test_105_bad_user_good_key.tags = ['nova', 'nova-api']
+
+ def test_106_good_user_bad_key(self):
+ if 'keystone' in self.config:
+ path = "http://%s:%s/%s" % (self.keystone['host'],
+ self.keystone['port'],
+ self.keystone['apiver'])
+ else:
+ path = "http://%s:%s/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': self.nova['user'],
+ 'X-Auth-Key': 'unknown_auth_key'}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(response.status, 401)
+ test_106_good_user_bad_key.tags = ['nova', 'nova-api']
+
+ def test_107_no_key(self):
+ if 'keystone' in self.config:
+ path = "http://%s:%s/%s" % (self.keystone['host'],
+ self.keystone['port'],
+ self.keystone['apiver'])
+ else:
+ path = "http://%s:%s/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': self.nova['user']}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(response.status, 401)
+ test_107_no_key.tags = ['nova', 'nova-api']
+
+ def test_108_bad_token(self):
+ if 'keystone' in self.config:
+ path = "http://%s:%s/%s" % (self.keystone['host'],
+ self.keystone['port'],
+ self.keystone['apiver'])
+ else:
+ path = "http://%s:%s/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-Token': 'unknown_token'}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(response.status, 401)
+ test_108_bad_token.tags = ['nova', 'nova-api']
+
+ def test_109_verify_blank_limits(self):
+ path = "http://%s:%s/%s/limits" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ self.assertNotEqual('{"limits": []}', content)
+ test_109_verify_blank_limits.tags = ['nova', 'nova-api']
+
+ def test_110_list_flavors_v1_1(self):
+ path = "http://%s:%s/%s/flavors" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ self.assertNotEqual('{"flavors": []}', content)
+ test_110_list_flavors_v1_1.tags = ['nova', 'nova-api']
+
+ def test_111_verify_kernel_active_v1_1(self):
+ # for testing purposes change self.glance['kernel_id'] to an active
+ # kernel image allow for skipping glance tests
+ if not 'kernel_id' in self.glance:
+ self.glance['kernel_id'] = "61"
+
+ path = "http://%s:%s/%s/images/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ self.glance['kernel_id'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+ self.assertEqual(data['image']['status'], 'ACTIVE')
+ test_111_verify_kernel_active_v1_1.tags = ['nova']
+
+ def test_112_verify_ramdisk_active_v1_1(self):
+ # for testing purposes change self.glance['ramdisk_id'] to an active
+ # ramdisk image, allows you to skip glance tests
+ if not 'ramdisk_id' in self.glance:
+ self.glance['ramdisk_id'] = "62"
+
+ path = "http://%s:%s/%s/images/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ self.glance['ramdisk_id'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+ self.assertEqual(data['image']['status'], 'ACTIVE')
+ test_112_verify_ramdisk_active_v1_1.tags = ['nova']
+
+ def test_113_verify_image_active_v1_1(self):
+ # for testing purposes change self.glance['image_id'] to an active
+ # image id allows for skipping glance tests
+ if not 'image_id' in self.glance:
+ self.glance['image_id'] = "63"
+
+ path = "http://%s:%s/%s/images/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ self.glance['image_id'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ data = json.loads(content)
+ self.assertEqual(data['image']['status'], 'ACTIVE')
+ test_113_verify_image_active_v1_1.tags = ['nova']
+
+ def test_200_create_server(self):
+ path = "http://%s:%s/%s/servers" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token']),
+ 'Content-Type': 'application/json'}
+
+ # Change imageRef to self.glance['image_id']
+ json_str = {"server":
+ {
+ "name": "testing server creation",
+ "flavorRef": "http://%s:%s/%s/flavors/2" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver']),
+ "imageRef": self.glance['image_id']
+# "imageRef": "http://%s:%s/%s/images/%s" % (self.nova['host'],
+# self.nova['port'],
+# self.nova['ver'],
+# self.glance['image_id'])
+ }
+ }
+ data = json.dumps(json_str)
+ response, content = http.request(path, 'POST', headers=headers,
+ body=data)
+ json_return = json.loads(content)
+ self.assertEqual(200, response.status)
+ self.assertEqual(json_return['server']['status'], "BUILD")
+ self.nova['single_server_id'] = json_return['server']['id']
+ time.sleep(5)
+ build_result = self.build_check(self.nova['single_server_id'])
+ self.assertEqual(build_result['status'], "ACTIVE")
+ self.assertEqual(build_result['ping'], True)
+ test_200_create_server.tags = ['nova']
+
+ def test_201_get_server_details(self):
+ path = "http://%s:%s/%s/servers/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ self.nova['single_server_id'])
+
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+
+ response, content = http.request(path, 'GET', headers=headers)
+ self.assertEqual(200, response.status)
+ test_201_get_server_details.tags = ['nova']
+
+ # MOVING TO 999 because it can kill the API
+ # Uncomment next line for testing
+ # def create_multi(self):
+ def test_999_create_multiple(self):
+ self.nova['multi_server'] = {}
+ path = "http://%s:%s/%s/servers" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token']),
+ 'Content-Type': 'application/json'}
+
+ for i in range(1, 10):
+ # Change imageRef to self.glance['image_id']
+ json_str = {"server":
+ {
+ "name": "test %s" % (i),
+ "flavorRef": "http://%s:%s/%s/flavors/2" % (
+ self.nova['host'],
+ self.nova['port'],
+ self.nova['ver']),
+ "imageRef": self.glance['image_id']
+# "imageRef": "http://%s:%s/%s/images/%s" % (
+# self.nova['host'],
+# self.nova['port'],
+# self.nova['ver'],
+# self.glance['image_id'])
+ }
+ }
+ data = json.dumps(json_str)
+ response, content = http.request(path, 'POST', headers=headers,
+ body=data)
+ json_return = json.loads(content)
+ self.assertEqual(200, response.status)
+ self.assertEqual(json_return['server']['status'], "BUILD")
+ self.nova['multi_server']["test %s" % (i)] = \
+ json_return['server']['id']
+ time.sleep(30)
+
+ for k, v in self.nova['multi_server'].iteritems():
+ build_result = self.build_check(v)
+ self.assertEqual(build_result['ping'], True)
+ test_999_create_multiple.tags = ['nova']
diff --git a/tests/999_test_cleanup.py b/tests/999_test_cleanup.py
new file mode 100644
index 0000000..d3bbcf6
--- /dev/null
+++ b/tests/999_test_cleanup.py
@@ -0,0 +1,97 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Functional test case that utilizes cURL against the API server"""
+
+import json
+import os
+import tempfile
+import unittest
+import httplib2
+import urllib
+import hashlib
+
+from pprint import pprint
+
+import tests
+
+
+class TestCleanUp(tests.FunctionalTest):
+ def test_995_delete_server(self):
+ path = "http://%s:%s/%s/servers/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ self.nova['single_server_id'])
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'DELETE', headers=headers)
+ self.assertEqual(204, response.status)
+ test_995_delete_server.tags = ['nova']
+
+ def test_996_delete_multi_server(self):
+ print "Deleting %s instances." % (len(self.nova['multi_server']))
+ for k, v in self.nova['multi_server'].iteritems():
+ path = "http://%s:%s/%s/servers/%s" % (self.nova['host'],
+ self.nova['port'],
+ self.nova['ver'],
+ v)
+ http = httplib2.Http()
+ headers = {'X-Auth-User': '%s' % (self.nova['user']),
+ 'X-Auth-Token': '%s' % (self.nova['X-Auth-Token'])}
+ response, content = http.request(path, 'DELETE', headers=headers)
+ self.assertEqual(204, response.status)
+ test_996_delete_multi_server.tags = ['nova']
+
+ def test_997_delete_kernel_from_glance(self):
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'],
+ self.glance['kernel_id'])
+ else:
+ path = "http://%s:%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['kernel_id'])
+ http = httplib2.Http()
+ response, content = http.request(path, 'DELETE')
+ self.assertEqual(200, response.status)
+ test_997_delete_kernel_from_glance.tags = ['glance', 'nova']
+
+ def test_998_delete_initrd_from_glance(self):
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'],
+ self.glance['ramdisk_id'])
+ else:
+ path = "http://%s:%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['ramdisk_id'])
+ http = httplib2.Http()
+ response, content = http.request(path, 'DELETE')
+ self.assertEqual(200, response.status)
+ test_998_delete_initrd_from_glance.tags = ['glance', 'nova']
+
+ def test_999_delete_image_from_glance(self):
+ if 'apiver' in self.glance:
+ path = "http://%s:%s/%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['apiver'],
+ self.glance['image_id'])
+ else:
+ path = "http://%s:%s/images/%s" % (self.glance['host'],
+ self.glance['port'], self.glance['image_id'])
+ http = httplib2.Http()
+ response, content = http.request(path, 'DELETE')
+ self.assertEqual(200, response.status)
+ test_999_delete_image_from_glance.tags = ['glance', 'nova']
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..515bfc3
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,159 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010-2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import ConfigParser
+from hashlib import md5
+import nose.plugins.skip
+import os
+import unittest2
+from xmlrpclib import Server
+
+NOVA_DATA = {}
+GLANCE_DATA = {}
+SWIFT_DATA = {}
+RABBITMQ_DATA = {}
+CONFIG_DATA = {}
+KEYSTONE_DATA = {}
+
+class skip_test(object):
+ """Decorator that skips a test."""
+ def __init__(self, msg):
+ self.message = msg
+
+ def __call__(self, func):
+ def _skipper(*args, **kw):
+ """Wrapped skipper function."""
+ raise nose.SkipTest(self.message)
+ _skipper.__name__ = func.__name__
+ _skipper.__doc__ = func.__doc__
+ return _skipper
+
+
+class skip_if(object):
+ """Decorator that skips a test."""
+ def __init__(self, condition, msg):
+ self.condition = condition
+ self.message = msg
+
+ def __call__(self, func):
+ def _skipper(*args, **kw):
+ """Wrapped skipper function."""
+ if self.condition:
+ raise nose.SkipTest(self.message)
+ func(*args, **kw)
+ _skipper.__name__ = func.__name__
+ _skipper.__doc__ = func.__doc__
+ return _skipper
+
+
+class skip_unless(object):
+ """Decorator that skips a test."""
+ def __init__(self, condition, msg):
+ self.condition = condition
+ self.message = msg
+
+ def __call__(self, func):
+ def _skipper(*args, **kw):
+ """Wrapped skipper function."""
+ if not self.condition:
+ raise nose.SkipTest(self.message)
+ func(*args, **kw)
+ _skipper.__name__ = func.__name__
+ _skipper.__doc__ = func.__doc__
+ return _skipper
+
+
+class FunctionalTest(unittest2.TestCase):
+ def setUp(self):
+ global GLANCE_DATA, NOVA_DATA, SWIFT_DATA, RABBITMQ_DATA, KEYSTONE_DATA, CONFIG_DATA
+ # Define config dict
+ self.config = CONFIG_DATA
+ # Define service specific dicts
+ self.glance = GLANCE_DATA
+ self.nova = NOVA_DATA
+ self.swift = SWIFT_DATA
+ self.rabbitmq = RABBITMQ_DATA
+ self.keystone = KEYSTONE_DATA
+
+ self._parse_defaults_file()
+
+ # Swift Setup
+ if 'swift' in self.config:
+ self.swift['auth_host'] = self.config['swift']['auth_host']
+ self.swift['auth_port'] = self.config['swift']['auth_port']
+ self.swift['auth_prefix'] = self.config['swift']['auth_prefix']
+ self.swift['auth_ssl'] = self.config['swift']['auth_ssl']
+ self.swift['account'] = self.config['swift']['account']
+ self.swift['username'] = self.config['swift']['username']
+ self.swift['password'] = self.config['swift']['password']
+ self.swift['ver'] = 'v1.0' # need to find a better way to get this.
+
+ # Glance Setup
+ self.glance['host'] = self.config['glance']['host']
+ self.glance['port'] = self.config['glance']['port']
+ if 'apiver' in self.config['glance']:
+ self.glance['apiver'] = self.config['glance']['apiver']
+
+ if 'nova' in self.config:
+ self.nova['host'] = self.config['nova']['host']
+ self.nova['port'] = self.config['nova']['port']
+ self.nova['ver'] = self.config['nova']['apiver']
+ self.nova['user'] = self.config['nova']['user']
+ self.nova['key'] = self.config['nova']['key']
+
+ if 'keystone' in self.config:
+ self.keystone['host'] = self.config['keystone']['host']
+ self.keystone['port'] = self.config['keystone']['port']
+ self.keystone['apiver'] = self.config['keystone']['apiver']
+ self.keystone['user'] = self.config['keystone']['user']
+ self.keystone['pass'] = self.config['keystone']['password']
+
+ def _md5sum_file(self, path):
+ md5sum = md5()
+ with open(path, 'rb') as file:
+ for chunk in iter(lambda: file.read(8192), ''):
+ md5sum.update(chunk)
+ return md5sum.hexdigest()
+
+ def _read_in_chunks(self, infile, chunk_size=1024 * 64):
+ file_data = open(infile, "rb")
+ while True:
+ # chunk = file_data.read(chunk_size).encode('base64')
+ chunk = file_data.read(chunk_size)
+ if chunk:
+ yield chunk
+ else:
+ return
+ file_data.close()
+
+ def _parse_defaults_file(self):
+ cfg = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ "..", "etc", "config.ini"))
+ if os.path.exists(cfg):
+ self._build_config(cfg)
+ else:
+ raise Exception("Cannot read %s" % cfg)
+
+ def _build_config(self, config_file):
+ parser = ConfigParser.ConfigParser()
+ parser.read(config_file)
+
+ for section in parser.sections():
+ self.config[section] = {}
+ for value in parser.options(section):
+ self.config[section][value] = parser.get(section, value)
+ # print "%s = %s" % (value, parser.get(section, value))