Update test_networks.py to v2 of Quantum API
Fixes bugs in the test suite tempest.tests.network.test_networks that prevents
Jenkins gate-tempest-devstack-vm-quantum-full to execute succesfully. The
problem was that the test suite was using v1 of the Quantum API, which
according to the "Quantum API Guide (V2.0)" has been removed from the code
base. Please see note at bottom of:
docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html
The test suite was patched to use operations supported by v2 of the Quantum
API. This required patches in the following files:
1) tempest.common.rest_client was patched to provide the correct end-point for
v2 of the Quantum API
2) tempest.services.network.json.network_client was patched to support create,
delete, list and show operations for the 3 basic abstractions offered by the
Quantum API: networks, sub-networks and ports
3) tempest.tests.network.base was patched to create a network and a subnetwork
using v2 Quantum API operations.
4) tempest.tests.network.test_networks was patched to invoke v2 Quantum API
operations. Individual tests invoking obsolete operations were eliminated
Fixes bug 1131458
Fixes bug 1159229
Change-Id: I320c46f2a42d8fd9dbfd496e8f357e21eb1c9259
diff --git a/tempest/common/rest_client.py b/tempest/common/rest_client.py
index 88c8886..02b5c9b 100644
--- a/tempest/common/rest_client.py
+++ b/tempest/common/rest_client.py
@@ -161,12 +161,6 @@
if mgmt_url is None:
raise exceptions.EndpointNotFound(service)
- if service == 'network':
- # Keystone does not return the correct endpoint for
- # quantum. Handle this separately.
- mgmt_url = (mgmt_url + self.config.network.api_version +
- "/tenants/" + tenant_id)
-
return token, mgmt_url
elif resp.status == 401:
diff --git a/tempest/config.py b/tempest/config.py
index 3a4a8c9..9c41660 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -260,9 +260,6 @@
cfg.StrOpt('catalog_type',
default='network',
help='Catalog type of the Quantum service.'),
- cfg.StrOpt('api_version',
- default="v1.1",
- help="Version of Quantum API"),
cfg.StrOpt('tenant_network_cidr',
default="10.100.0.0/16",
help="The cidr block to allocate tenant networks from"),
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index 34342c9..4758ddd 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -4,98 +4,112 @@
class NetworkClient(RestClient):
+ """
+ Tempest REST client for Quantum. Uses v2 of the Quantum API, since the
+ V1 API has been removed from the code base.
+
+ Implements the following operations for each one of the basic Quantum
+ abstractions (networks, sub-networks and ports):
+
+ create
+ delete
+ list
+ show
+ """
+
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(NetworkClient, self).__init__(config, username, password,
auth_url, tenant_name)
self.service = self.config.network.catalog_type
+ self.version = '2.0'
+ self.uri_prefix = "v%s" % (self.version)
def list_networks(self):
- resp, body = self.get('networks')
+ uri = '%s/networks' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
- def create_network(self, name, key="network"):
+ def create_network(self, name):
post_body = {
- key: {
+ 'network': {
'name': name,
}
}
- headers = {'Content-Type': 'application/json'}
body = json.dumps(post_body)
- resp, body = self.post('networks', headers=headers, body=body)
+ uri = '%s/networks' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
body = json.loads(body)
return resp, body
- def list_networks_details(self):
- resp, body = self.get('networks/detail')
- body = json.loads(body)
- return resp, body
-
- def get_network(self, uuid):
- resp, body = self.get('networks/%s' % uuid)
- body = json.loads(body)
- return resp, body
-
- def get_network_details(self, uuid):
- resp, body = self.get('networks/%s/detail' % uuid)
+ def show_network(self, uuid):
+ uri = '%s/networks/%s' % (self.uri_prefix, uuid)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
def delete_network(self, uuid):
- resp, body = self.delete('networks/%s' % uuid)
+ uri = '%s/networks/%s' % (self.uri_prefix, uuid)
+ resp, body = self.delete(uri, self.headers)
return resp, body
- def create_port(self, network_id, zone, state=None, key='port'):
+ def create_subnet(self, net_uuid, cidr):
+ post_body = dict(
+ subnet=dict(
+ ip_version=4,
+ network_id=net_uuid,
+ cidr=cidr),)
+ body = json.dumps(post_body)
+ uri = '%s/subnets' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
+ body = json.loads(body)
+ return resp, body
+
+ def delete_subnet(self, uuid):
+ uri = '%s/subnets/%s' % (self.uri_prefix, uuid)
+ resp, body = self.delete(uri, self.headers)
+ return resp, body
+
+ def list_subnets(self):
+ uri = '%s/subnets' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
+ def show_subnet(self, uuid):
+ uri = '%s/subnets/%s' % (self.uri_prefix, uuid)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
+ def create_port(self, network_id, state=None):
if not state:
- state = 'ACTIVE'
+ state = True
post_body = {
- key: {
- 'state': state,
- 'nova_id': zone
+ 'port': {
+ 'network_id': network_id,
+ 'admin_state_up': state,
}
}
- headers = {'Content-Type': 'application/json'}
body = json.dumps(post_body)
- resp, body = self.post('networks/%s/ports.json' % network_id,
- headers=headers, body=body)
+ uri = '%s/ports' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
body = json.loads(body)
return resp, body
- def delete_port(self, network_id, port_id):
- resp, body = self.delete('networks/%s/ports/%s.json' %
- (network_id, port_id))
+ def delete_port(self, port_id):
+ uri = '%s/ports/%s' % (self.uri_prefix, port_id)
+ resp, body = self.delete(uri, self.headers)
return resp, body
- def list_ports(self, network_id):
- resp, body = self.get('networks/%s/ports.json' % network_id)
+ def list_ports(self):
+ uri = '%s/ports' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
- def list_port_details(self, network_id):
- url = 'networks/%s/ports/detail.json' % network_id
- resp, body = self.get(url)
- body = json.loads(body)
- return resp, body
-
- def attach_port(self, network_id, port_id, interface_id):
- post_body = {
- 'attachment': {
- 'id': interface_id
- }
- }
- headers = {'Content-Type': 'application/json'}
- body = json.dumps(post_body)
- url = 'networks/%s/ports/%s/attachment.json' % (network_id, port_id)
- resp, body = self.put(url, headers=headers, body=body)
- return resp, body
-
- def detach_port(self, network_id, port_id):
- url = 'networks/%s/ports/%s/attachment.json' % (network_id, port_id)
- resp, body = self.delete(url)
- return resp, body
-
- def list_port_attachment(self, network_id, port_id):
- url = 'networks/%s/ports/%s/attachment.json' % (network_id, port_id)
- resp, body = self.get(url)
+ def show_port(self, port_id):
+ uri = '%s/ports/%s' % (self.uri_prefix, port_id)
+ resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
diff --git a/tempest/tests/network/base.py b/tempest/tests/network/base.py
index 1b09513..e0e40cb 100644
--- a/tempest/tests/network/base.py
+++ b/tempest/tests/network/base.py
@@ -15,31 +15,73 @@
# License for the specific language governing permissions and limitations
# under the License.
+import netaddr
from tempest import clients
from tempest.common.utils.data_utils import rand_name
+from tempest import exceptions
import tempest.test
class BaseNetworkTest(tempest.test.BaseTestCase):
+ """
+ Base class for the Quantum tests that use the Tempest Quantum REST client
+
+ Per the Quantum API Guide, API v1.x was removed from the source code tree
+ (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
+ Therefore, v2.x of the Quantum API is assumed. It is also assumed that the
+ following options are defined in the [network] section of etc/tempest.conf:
+
+ tenant_network_cidr with a block of cidr's from which smaller blocks
+ can be allocated for tenant networks
+
+ tenant_network_mask_bits with the mask bits to be used to partition the
+ block defined by tenant-network_cidr
+ """
+
@classmethod
def setUpClass(cls):
os = clients.Manager()
-
- if not os.config.network.quantum_available:
+ cls.network_cfg = os.config.network
+ if not cls.network_cfg.quantum_available:
raise cls.skipException("Quantum support is required")
+ cls.client = os.network_client
+ cls.networks = []
+ cls.subnets = []
@classmethod
def tearDownClass(cls):
+ for subnet in cls.subnets:
+ cls.client.delete_subnet(subnet['id'])
for network in cls.networks:
cls.client.delete_network(network['id'])
- def create_network(self, network_name=None):
+ @classmethod
+ def create_network(cls, network_name=None):
"""Wrapper utility that returns a test network."""
- network_name = network_name or rand_name('test-network')
+ network_name = network_name or rand_name('test-network-')
- resp, body = self.client.create_network(network_name)
+ resp, body = cls.client.create_network(network_name)
network = body['network']
- self.networks.append(network)
+ cls.networks.append(network)
return network
+
+ @classmethod
+ def create_subnet(cls, network):
+ """Wrapper utility that returns a test subnet."""
+ cidr = netaddr.IPNetwork(cls.network_cfg.tenant_network_cidr)
+ mask_bits = cls.network_cfg.tenant_network_mask_bits
+ # Find a cidr that is not in use yet and create a subnet with it
+ for subnet_cidr in cidr.subnet(mask_bits):
+ try:
+ resp, body = cls.client.create_subnet(network['id'],
+ str(subnet_cidr))
+ break
+ except exceptions.BadRequest as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ subnet = body['subnet']
+ cls.subnets.append(subnet)
+ return subnet
diff --git a/tempest/tests/network/test_networks.py b/tempest/tests/network/test_networks.py
index 136279f..e61bc62 100644
--- a/tempest/tests/network/test_networks.py
+++ b/tempest/tests/network/test_networks.py
@@ -15,51 +15,84 @@
# License for the specific language governing permissions and limitations
# under the License.
-from tempest.test import attr
+import netaddr
from tempest.common.utils.data_utils import rand_name
+from tempest import exceptions
+from tempest.test import attr
from tempest.tests.network import base
class NetworksTest(base.BaseNetworkTest):
+ """
+ Tests the following operations in the Quantum API using the REST client for
+ Quantum:
+
+ create a network for a tenant
+ list tenant's networks
+ show a tenant network details
+ create a subnet for a tenant
+ list tenant's subnets
+ show a tenant subnet details
+
+ v2.0 of the Quantum API is assumed. It is also assumed that the following
+ options are defined in the [network] section of etc/tempest.conf:
+
+ tenant_network_cidr with a block of cidr's from which smaller blocks
+ can be allocated for tenant networks
+
+ tenant_network_mask_bits with the mask bits to be used to partition the
+ block defined by tenant-network_cidr
+ """
+
@classmethod
def setUpClass(cls):
super(NetworksTest, cls).setUpClass()
cls.network = cls.create_network()
cls.name = cls.network['name']
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.cidr = cls.subnet['cidr']
@attr(type='positive')
- def test_create_delete_network(self):
- # Creates and deletes a network for a tenant
- name = rand_name('network')
+ def test_create_delete_network_subnet(self):
+ # Creates a network
+ name = rand_name('network-')
resp, body = self.client.create_network(name)
- self.assertEqual('202', resp['status'])
+ self.assertEqual('201', resp['status'])
network = body['network']
self.assertTrue(network['id'] is not None)
+ # Find a cidr that is not in use yet and create a subnet with it
+ cidr = netaddr.IPNetwork(self.network_cfg.tenant_network_cidr)
+ mask_bits = self.network_cfg.tenant_network_mask_bits
+ for subnet_cidr in cidr.subnet(mask_bits):
+ try:
+ resp, body = self.client.create_subnet(network['id'],
+ str(subnet_cidr))
+ break
+ except exceptions.BadRequest as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ self.assertEqual('201', resp['status'])
+ subnet = body['subnet']
+ self.assertTrue(subnet['id'] is not None)
+ #Deletes subnet and network
+ resp, body = self.client.delete_subnet(subnet['id'])
+ self.assertEqual('204', resp['status'])
resp, body = self.client.delete_network(network['id'])
self.assertEqual('204', resp['status'])
@attr(type='positive')
def test_show_network(self):
# Verifies the details of a network
- resp, body = self.client.get_network(self.network['id'])
+ resp, body = self.client.show_network(self.network['id'])
self.assertEqual('200', resp['status'])
network = body['network']
self.assertEqual(self.network['id'], network['id'])
self.assertEqual(self.name, network['name'])
@attr(type='positive')
- def test_show_network_details(self):
- # Verifies the full details of a network
- resp, body = self.client.get_network_details(self.network['id'])
- self.assertEqual('200', resp['status'])
- network = body['network']
- self.assertEqual(self.network['id'], network['id'])
- self.assertEqual(self.name, network['name'])
- self.assertEqual(len(network['ports']), 0)
-
- @attr(type='positive')
def test_list_networks(self):
# Verify the network exists in the list of all networks
resp, body = self.client.list_networks()
@@ -68,9 +101,18 @@
self.assertTrue(found)
@attr(type='positive')
- def test_list_networks_with_detail(self):
- # Verify the network exists in the detailed list of all networks
- resp, body = self.client.list_networks_details()
- networks = body['networks']
- found = any(n for n in networks if n['id'] == self.network['id'])
+ def test_show_subnet(self):
+ # Verifies the details of a subnet
+ resp, body = self.client.show_subnet(self.subnet['id'])
+ self.assertEqual('200', resp['status'])
+ subnet = body['subnet']
+ self.assertEqual(self.subnet['id'], subnet['id'])
+ self.assertEqual(self.cidr, subnet['cidr'])
+
+ @attr(type='positive')
+ def test_list_subnets(self):
+ # Verify the subnet exists in the list of all subnets
+ resp, body = self.client.list_subnets()
+ subnets = body['subnets']
+ found = any(n for n in subnets if n['id'] == self.subnet['id'])
self.assertTrue(found)