Merge "Update test_networks.py to v2 of Quantum API"
diff --git a/tempest/common/rest_client.py b/tempest/common/rest_client.py
index 88c8886..02b5c9b 100644
--- a/tempest/common/rest_client.py
+++ b/tempest/common/rest_client.py
@@ -161,12 +161,6 @@
if mgmt_url is None:
raise exceptions.EndpointNotFound(service)
- if service == 'network':
- # Keystone does not return the correct endpoint for
- # quantum. Handle this separately.
- mgmt_url = (mgmt_url + self.config.network.api_version +
- "/tenants/" + tenant_id)
-
return token, mgmt_url
elif resp.status == 401:
diff --git a/tempest/config.py b/tempest/config.py
index 3a4a8c9..9c41660 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -260,9 +260,6 @@
cfg.StrOpt('catalog_type',
default='network',
help='Catalog type of the Quantum service.'),
- cfg.StrOpt('api_version',
- default="v1.1",
- help="Version of Quantum API"),
cfg.StrOpt('tenant_network_cidr',
default="10.100.0.0/16",
help="The cidr block to allocate tenant networks from"),
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index 34342c9..4758ddd 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -4,98 +4,112 @@
class NetworkClient(RestClient):
+ """
+ Tempest REST client for Quantum. Uses v2 of the Quantum API, since the
+ V1 API has been removed from the code base.
+
+ Implements the following operations for each one of the basic Quantum
+ abstractions (networks, sub-networks and ports):
+
+ create
+ delete
+ list
+ show
+ """
+
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(NetworkClient, self).__init__(config, username, password,
auth_url, tenant_name)
self.service = self.config.network.catalog_type
+ self.version = '2.0'
+ self.uri_prefix = "v%s" % (self.version)
def list_networks(self):
- resp, body = self.get('networks')
+ uri = '%s/networks' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
- def create_network(self, name, key="network"):
+ def create_network(self, name):
post_body = {
- key: {
+ 'network': {
'name': name,
}
}
- headers = {'Content-Type': 'application/json'}
body = json.dumps(post_body)
- resp, body = self.post('networks', headers=headers, body=body)
+ uri = '%s/networks' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
body = json.loads(body)
return resp, body
- def list_networks_details(self):
- resp, body = self.get('networks/detail')
- body = json.loads(body)
- return resp, body
-
- def get_network(self, uuid):
- resp, body = self.get('networks/%s' % uuid)
- body = json.loads(body)
- return resp, body
-
- def get_network_details(self, uuid):
- resp, body = self.get('networks/%s/detail' % uuid)
+ def show_network(self, uuid):
+ uri = '%s/networks/%s' % (self.uri_prefix, uuid)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
def delete_network(self, uuid):
- resp, body = self.delete('networks/%s' % uuid)
+ uri = '%s/networks/%s' % (self.uri_prefix, uuid)
+ resp, body = self.delete(uri, self.headers)
return resp, body
- def create_port(self, network_id, zone, state=None, key='port'):
+ def create_subnet(self, net_uuid, cidr):
+ post_body = dict(
+ subnet=dict(
+ ip_version=4,
+ network_id=net_uuid,
+ cidr=cidr),)
+ body = json.dumps(post_body)
+ uri = '%s/subnets' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
+ body = json.loads(body)
+ return resp, body
+
+ def delete_subnet(self, uuid):
+ uri = '%s/subnets/%s' % (self.uri_prefix, uuid)
+ resp, body = self.delete(uri, self.headers)
+ return resp, body
+
+ def list_subnets(self):
+ uri = '%s/subnets' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
+ def show_subnet(self, uuid):
+ uri = '%s/subnets/%s' % (self.uri_prefix, uuid)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
+ def create_port(self, network_id, state=None):
if not state:
- state = 'ACTIVE'
+ state = True
post_body = {
- key: {
- 'state': state,
- 'nova_id': zone
+ 'port': {
+ 'network_id': network_id,
+ 'admin_state_up': state,
}
}
- headers = {'Content-Type': 'application/json'}
body = json.dumps(post_body)
- resp, body = self.post('networks/%s/ports.json' % network_id,
- headers=headers, body=body)
+ uri = '%s/ports' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
body = json.loads(body)
return resp, body
- def delete_port(self, network_id, port_id):
- resp, body = self.delete('networks/%s/ports/%s.json' %
- (network_id, port_id))
+ def delete_port(self, port_id):
+ uri = '%s/ports/%s' % (self.uri_prefix, port_id)
+ resp, body = self.delete(uri, self.headers)
return resp, body
- def list_ports(self, network_id):
- resp, body = self.get('networks/%s/ports.json' % network_id)
+ def list_ports(self):
+ uri = '%s/ports' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
- def list_port_details(self, network_id):
- url = 'networks/%s/ports/detail.json' % network_id
- resp, body = self.get(url)
- body = json.loads(body)
- return resp, body
-
- def attach_port(self, network_id, port_id, interface_id):
- post_body = {
- 'attachment': {
- 'id': interface_id
- }
- }
- headers = {'Content-Type': 'application/json'}
- body = json.dumps(post_body)
- url = 'networks/%s/ports/%s/attachment.json' % (network_id, port_id)
- resp, body = self.put(url, headers=headers, body=body)
- return resp, body
-
- def detach_port(self, network_id, port_id):
- url = 'networks/%s/ports/%s/attachment.json' % (network_id, port_id)
- resp, body = self.delete(url)
- return resp, body
-
- def list_port_attachment(self, network_id, port_id):
- url = 'networks/%s/ports/%s/attachment.json' % (network_id, port_id)
- resp, body = self.get(url)
+ def show_port(self, port_id):
+ uri = '%s/ports/%s' % (self.uri_prefix, port_id)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
diff --git a/tempest/tests/network/base.py b/tempest/tests/network/base.py
index 1b09513..e0e40cb 100644
--- a/tempest/tests/network/base.py
+++ b/tempest/tests/network/base.py
@@ -15,31 +15,73 @@
# License for the specific language governing permissions and limitations
# under the License.
+import netaddr
from tempest import clients
from tempest.common.utils.data_utils import rand_name
+from tempest import exceptions
import tempest.test
class BaseNetworkTest(tempest.test.BaseTestCase):
+ """
+ Base class for the Quantum tests that use the Tempest Quantum REST client
+
+ Per the Quantum API Guide, API v1.x was removed from the source code tree
+ (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
+ Therefore, v2.x of the Quantum API is assumed. It is also assumed that the
+ following options are defined in the [network] section of etc/tempest.conf:
+
+ tenant_network_cidr with a block of cidr's from which smaller blocks
+ can be allocated for tenant networks
+
+ tenant_network_mask_bits with the mask bits to be used to partition the
+ block defined by tenant-network_cidr
+ """
+
@classmethod
def setUpClass(cls):
os = clients.Manager()
-
- if not os.config.network.quantum_available:
+ cls.network_cfg = os.config.network
+ if not cls.network_cfg.quantum_available:
raise cls.skipException("Quantum support is required")
+ cls.client = os.network_client
+ cls.networks = []
+ cls.subnets = []
@classmethod
def tearDownClass(cls):
+ for subnet in cls.subnets:
+ cls.client.delete_subnet(subnet['id'])
for network in cls.networks:
cls.client.delete_network(network['id'])
- def create_network(self, network_name=None):
+ @classmethod
+ def create_network(cls, network_name=None):
"""Wrapper utility that returns a test network."""
- network_name = network_name or rand_name('test-network')
+ network_name = network_name or rand_name('test-network-')
- resp, body = self.client.create_network(network_name)
+ resp, body = cls.client.create_network(network_name)
network = body['network']
- self.networks.append(network)
+ cls.networks.append(network)
return network
+
+ @classmethod
+ def create_subnet(cls, network):
+ """Wrapper utility that returns a test subnet."""
+ cidr = netaddr.IPNetwork(cls.network_cfg.tenant_network_cidr)
+ mask_bits = cls.network_cfg.tenant_network_mask_bits
+ # Find a cidr that is not in use yet and create a subnet with it
+ for subnet_cidr in cidr.subnet(mask_bits):
+ try:
+ resp, body = cls.client.create_subnet(network['id'],
+ str(subnet_cidr))
+ break
+ except exceptions.BadRequest as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ subnet = body['subnet']
+ cls.subnets.append(subnet)
+ return subnet
diff --git a/tempest/tests/network/test_networks.py b/tempest/tests/network/test_networks.py
index 136279f..e61bc62 100644
--- a/tempest/tests/network/test_networks.py
+++ b/tempest/tests/network/test_networks.py
@@ -15,51 +15,84 @@
# License for the specific language governing permissions and limitations
# under the License.
-from tempest.test import attr
+import netaddr
from tempest.common.utils.data_utils import rand_name
+from tempest import exceptions
+from tempest.test import attr
from tempest.tests.network import base
class NetworksTest(base.BaseNetworkTest):
+ """
+ Tests the following operations in the Quantum API using the REST client for
+ Quantum:
+
+ create a network for a tenant
+ list tenant's networks
+ show a tenant network details
+ create a subnet for a tenant
+ list tenant's subnets
+ show a tenant subnet details
+
+ v2.0 of the Quantum API is assumed. It is also assumed that the following
+ options are defined in the [network] section of etc/tempest.conf:
+
+ tenant_network_cidr with a block of cidr's from which smaller blocks
+ can be allocated for tenant networks
+
+ tenant_network_mask_bits with the mask bits to be used to partition the
+ block defined by tenant-network_cidr
+ """
+
@classmethod
def setUpClass(cls):
super(NetworksTest, cls).setUpClass()
cls.network = cls.create_network()
cls.name = cls.network['name']
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.cidr = cls.subnet['cidr']
@attr(type='positive')
- def test_create_delete_network(self):
- # Creates and deletes a network for a tenant
- name = rand_name('network')
+ def test_create_delete_network_subnet(self):
+ # Creates a network
+ name = rand_name('network-')
resp, body = self.client.create_network(name)
- self.assertEqual('202', resp['status'])
+ self.assertEqual('201', resp['status'])
network = body['network']
self.assertTrue(network['id'] is not None)
+ # Find a cidr that is not in use yet and create a subnet with it
+ cidr = netaddr.IPNetwork(self.network_cfg.tenant_network_cidr)
+ mask_bits = self.network_cfg.tenant_network_mask_bits
+ for subnet_cidr in cidr.subnet(mask_bits):
+ try:
+ resp, body = self.client.create_subnet(network['id'],
+ str(subnet_cidr))
+ break
+ except exceptions.BadRequest as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ self.assertEqual('201', resp['status'])
+ subnet = body['subnet']
+ self.assertTrue(subnet['id'] is not None)
+ #Deletes subnet and network
+ resp, body = self.client.delete_subnet(subnet['id'])
+ self.assertEqual('204', resp['status'])
resp, body = self.client.delete_network(network['id'])
self.assertEqual('204', resp['status'])
@attr(type='positive')
def test_show_network(self):
# Verifies the details of a network
- resp, body = self.client.get_network(self.network['id'])
+ resp, body = self.client.show_network(self.network['id'])
self.assertEqual('200', resp['status'])
network = body['network']
self.assertEqual(self.network['id'], network['id'])
self.assertEqual(self.name, network['name'])
@attr(type='positive')
- def test_show_network_details(self):
- # Verifies the full details of a network
- resp, body = self.client.get_network_details(self.network['id'])
- self.assertEqual('200', resp['status'])
- network = body['network']
- self.assertEqual(self.network['id'], network['id'])
- self.assertEqual(self.name, network['name'])
- self.assertEqual(len(network['ports']), 0)
-
- @attr(type='positive')
def test_list_networks(self):
# Verify the network exists in the list of all networks
resp, body = self.client.list_networks()
@@ -68,9 +101,18 @@
self.assertTrue(found)
@attr(type='positive')
- def test_list_networks_with_detail(self):
- # Verify the network exists in the detailed list of all networks
- resp, body = self.client.list_networks_details()
- networks = body['networks']
- found = any(n for n in networks if n['id'] == self.network['id'])
+ def test_show_subnet(self):
+ # Verifies the details of a subnet
+ resp, body = self.client.show_subnet(self.subnet['id'])
+ self.assertEqual('200', resp['status'])
+ subnet = body['subnet']
+ self.assertEqual(self.subnet['id'], subnet['id'])
+ self.assertEqual(self.cidr, subnet['cidr'])
+
+ @attr(type='positive')
+ def test_list_subnets(self):
+ # Verify the subnet exists in the list of all subnets
+ resp, body = self.client.list_subnets()
+ subnets = body['subnets']
+ found = any(n for n in subnets if n['id'] == self.subnet['id'])
self.assertTrue(found)