Merge "Cleanup: Add common "create security rule" method"
diff --git a/etc/tempest.conf.sample b/etc/tempest.conf.sample
index 2d7f754..703d92a 100644
--- a/etc/tempest.conf.sample
+++ b/etc/tempest.conf.sample
@@ -272,6 +272,9 @@
# Set to True if the Account Quota middleware is enabled
accounts_quotas_available = True
+# Set operator role for tests that require creating a container
+operator_role = Member
+
[boto]
# This section contains configuration options used when executing tests
# with boto.
diff --git a/tempest/api/compute/volumes/test_attach_volume.py b/tempest/api/compute/volumes/test_attach_volume.py
index a3b051e..b67a5e0 100644
--- a/tempest/api/compute/volumes/test_attach_volume.py
+++ b/tempest/api/compute/volumes/test_attach_volume.py
@@ -55,6 +55,7 @@
# Start a server and wait for it to become ready
resp, server = self.create_server(wait_until='ACTIVE',
adminPass='password')
+ self.server = server
# Record addresses so that we can ssh later
resp, server['addresses'] = \
diff --git a/tempest/api/identity/admin/test_users.py b/tempest/api/identity/admin/test_users.py
index 6f90b04..8cdcee1 100644
--- a/tempest/api/identity/admin/test_users.py
+++ b/tempest/api/identity/admin/test_users.py
@@ -26,11 +26,14 @@
class UsersTestJSON(base.BaseIdentityAdminTest):
_interface = 'json'
- alt_user = rand_name('test_user_')
- alt_password = rand_name('pass_')
- alt_email = alt_user + '@testmail.tm'
- alt_tenant = rand_name('test_tenant_')
- alt_description = rand_name('desc_')
+ @classmethod
+ def setUpClass(cls):
+ super(UsersTestJSON, cls).setUpClass()
+ cls.alt_user = rand_name('test_user_')
+ cls.alt_password = rand_name('pass_')
+ cls.alt_email = cls.alt_user + '@testmail.tm'
+ cls.alt_tenant = rand_name('test_tenant_')
+ cls.alt_description = rand_name('desc_')
@attr(type='smoke')
def test_create_user(self):
@@ -101,8 +104,9 @@
@attr(type='smoke')
def test_delete_user(self):
# Delete a user
+ alt_user2 = rand_name('alt_user_')
self.data.setup_test_tenant()
- resp, user = self.client.create_user('user_1234', self.alt_password,
+ resp, user = self.client.create_user(alt_user2, self.alt_password,
self.data.tenant['id'],
self.alt_email)
self.assertEquals('200', resp['status'])
@@ -228,13 +232,16 @@
self.data.setup_test_tenant()
user_ids = list()
fetched_user_ids = list()
- resp, user1 = self.client.create_user('tenant_user1', 'password1',
+ alt_tenant_user1 = rand_name('tenant_user1_')
+ resp, user1 = self.client.create_user(alt_tenant_user1, 'password1',
self.data.tenant['id'],
'user1@123')
self.assertEquals('200', resp['status'])
user_ids.append(user1['id'])
self.data.users.append(user1)
- resp, user2 = self.client.create_user('tenant_user2', 'password2',
+
+ alt_tenant_user2 = rand_name('tenant_user2_')
+ resp, user2 = self.client.create_user(alt_tenant_user2, 'password2',
self.data.tenant['id'],
'user2@123')
self.assertEquals('200', resp['status'])
@@ -267,9 +274,11 @@
resp, role = self.client.assign_user_role(tenant['id'], user['id'],
role['id'])
self.assertEquals('200', resp['status'])
- resp, second_user = self.client.create_user('second_user', 'password1',
+
+ alt_user2 = rand_name('second_user_')
+ resp, second_user = self.client.create_user(alt_user2, 'password1',
self.data.tenant['id'],
- 'user1@123')
+ 'user2@123')
self.assertEquals('200', resp['status'])
user_ids.append(second_user['id'])
self.data.users.append(second_user)
diff --git a/tempest/api/network/base.py b/tempest/api/network/base.py
index 2a3b3f7..19c5f84 100644
--- a/tempest/api/network/base.py
+++ b/tempest/api/network/base.py
@@ -48,7 +48,7 @@
@classmethod
def setUpClass(cls):
super(BaseNetworkTest, cls).setUpClass()
- os = clients.Manager()
+ os = clients.Manager(interface=cls._interface)
cls.network_cfg = os.config.network
if not cls.config.service_available.neutron:
raise cls.skipException("Neutron support is required")
diff --git a/tempest/api/network/test_networks.py b/tempest/api/network/test_networks.py
index 7dcbbd8..7f49452 100644
--- a/tempest/api/network/test_networks.py
+++ b/tempest/api/network/test_networks.py
@@ -23,7 +23,8 @@
from tempest.test import attr
-class NetworksTest(base.BaseNetworkTest):
+class NetworksTestJSON(base.BaseNetworkTest):
+ _interface = 'json'
"""
Tests the following operations in the Neutron API using the REST client for
@@ -55,7 +56,7 @@
@classmethod
def setUpClass(cls):
- super(NetworksTest, cls).setUpClass()
+ super(NetworksTestJSON, cls).setUpClass()
cls.network = cls.create_network()
cls.name = cls.network['name']
cls.subnet = cls.create_subnet(cls.network)
@@ -109,7 +110,7 @@
self.assertEqual('200', resp['status'])
updated_subnet = body['subnet']
self.assertEqual(updated_subnet['name'], new_subnet)
- # Deletes subnet and network
+ # Delete subnet and network
resp, body = self.client.delete_subnet(subnet_id)
self.assertEqual('204', resp['status'])
resp, body = self.client.delete_network(net_id)
@@ -128,6 +129,7 @@
def test_list_networks(self):
# Verify the network exists in the list of all networks
resp, body = self.client.list_networks()
+ self.assertEqual('200', resp['status'])
networks = body['networks']
found = None
for n in networks:
@@ -149,6 +151,7 @@
def test_list_subnets(self):
# Verify the subnet exists in the list of all subnets
resp, body = self.client.list_subnets()
+ self.assertEqual('200', resp['status'])
subnets = body['subnets']
found = None
for n in subnets:
@@ -159,7 +162,7 @@
@attr(type='gate')
def test_create_update_delete_port(self):
- # Verify that successful port creation & deletion
+ # Verify that successful port creation, update & deletion
resp, body = self.client.create_port(self.network['id'])
self.assertEqual('201', resp['status'])
port = body['port']
@@ -174,7 +177,7 @@
self.assertEqual('204', resp['status'])
@attr(type='gate')
- def test_show_ports(self):
+ def test_show_port(self):
# Verify the details of port
resp, body = self.client.show_port(self.port['id'])
self.assertEqual('200', resp['status'])
@@ -221,3 +224,7 @@
for n in created_networks:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], networks_list)
+
+
+class NetworksTestXML(NetworksTestJSON):
+ _interface = 'xml'
diff --git a/tempest/api/network/test_quotas.py b/tempest/api/network/test_quotas.py
index ba70f34..b49cbe8 100644
--- a/tempest/api/network/test_quotas.py
+++ b/tempest/api/network/test_quotas.py
@@ -23,6 +23,7 @@
class QuotasTest(base.BaseNetworkTest):
+ _interface = 'json'
"""
Tests the following operations in the Neutron API using the REST client for
diff --git a/tempest/api/network/test_routers.py b/tempest/api/network/test_routers.py
index df85682..4f687b0 100644
--- a/tempest/api/network/test_routers.py
+++ b/tempest/api/network/test_routers.py
@@ -21,6 +21,7 @@
class RoutersTest(base.BaseNetworkTest):
+ _interface = 'json'
@classmethod
def setUpClass(cls):
diff --git a/tempest/api/object_storage/base.py b/tempest/api/object_storage/base.py
index 820328c..e6e8d17 100644
--- a/tempest/api/object_storage/base.py
+++ b/tempest/api/object_storage/base.py
@@ -18,6 +18,7 @@
from tempest.api.identity.base import DataGenerator
from tempest import clients
+from tempest.common import isolated_creds
from tempest import exceptions
import tempest.test
@@ -30,16 +31,41 @@
if not cls.config.service_available.swift:
skip_msg = ("%s skipped as swift is not available" % cls.__name__)
raise cls.skipException(skip_msg)
- cls.os = clients.Manager()
+ cls.isolated_creds = isolated_creds.IsolatedCreds(cls.__name__)
+ if cls.config.compute.allow_tenant_isolation:
+ # Get isolated creds for normal user
+ creds = cls.isolated_creds.get_primary_creds()
+ username, tenant_name, password = creds
+ cls.os = clients.Manager(username=username,
+ password=password,
+ tenant_name=tenant_name)
+ # Get isolated creds for admin user
+ admin_creds = cls.isolated_creds.get_admin_creds()
+ admin_username, admin_tenant_name, admin_password = admin_creds
+ cls.os_admin = clients.Manager(username=admin_username,
+ password=admin_password,
+ tenant_name=admin_tenant_name)
+ # Get isolated creds for alt user
+ alt_creds = cls.isolated_creds.get_alt_creds()
+ alt_username, alt_tenant, alt_password = alt_creds
+ cls.os_alt = clients.Manager(username=alt_username,
+ password=alt_password,
+ tenant_name=alt_tenant)
+ # Add isolated users to operator role so that they can create a
+ # container in swift.
+ cls._assign_member_role()
+ else:
+ cls.os = clients.Manager()
+ cls.os_admin = clients.AdminManager()
+ cls.os_alt = clients.AltManager()
+
cls.object_client = cls.os.object_client
cls.container_client = cls.os.container_client
cls.account_client = cls.os.account_client
cls.custom_object_client = cls.os.custom_object_client
- cls.os_admin = clients.AdminManager()
cls.token_client = cls.os_admin.token_client
cls.identity_admin_client = cls.os_admin.identity_client
cls.custom_account_client = cls.os.custom_account_client
- cls.os_alt = clients.AltManager()
cls.object_client_alt = cls.os_alt.object_client
cls.container_client_alt = cls.os_alt.container_client
cls.identity_client_alt = cls.os_alt.identity_client
@@ -47,6 +73,22 @@
cls.data = DataGenerator(cls.identity_admin_client)
@classmethod
+ def _assign_member_role(cls):
+ primary_user = cls.isolated_creds.get_primary_user()
+ alt_user = cls.isolated_creds.get_alt_user()
+ swift_role = cls.config.object_storage.operator_role
+ try:
+ resp, roles = cls.os_admin.identity_client.list_roles()
+ role = next(r for r in roles if r['name'] == swift_role)
+ except StopIteration:
+ msg = "No role named %s found" % swift_role
+ raise exceptions.NotFound(msg)
+ for user in [primary_user, alt_user]:
+ cls.os_admin.identity_client.assign_user_role(user['tenantId'],
+ user['id'],
+ role['id'])
+
+ @classmethod
def delete_containers(cls, containers, container_client=None,
object_client=None):
"""Remove given containers and all objects in them.
diff --git a/tempest/clients.py b/tempest/clients.py
index 195cb89..48e4939 100644
--- a/tempest/clients.py
+++ b/tempest/clients.py
@@ -90,7 +90,8 @@
from tempest.services.identity.xml.identity_client import TokenClientXML
from tempest.services.image.v1.json.image_client import ImageClientJSON
from tempest.services.image.v2.json.image_client import ImageClientV2JSON
-from tempest.services.network.json.network_client import NetworkClient
+from tempest.services.network.json.network_client import NetworkClientJSON
+from tempest.services.network.xml.network_client import NetworkClientXML
from tempest.services.object_storage.account_client import AccountClient
from tempest.services.object_storage.account_client import \
AccountClientCustomizedHeader
@@ -116,6 +117,11 @@
"xml": ImagesClientXML,
}
+NETWORKS_CLIENTS = {
+ "json": NetworkClientJSON,
+ "xml": NetworkClientXML,
+}
+
KEYPAIRS_CLIENTS = {
"json": KeyPairsClientJSON,
"xml": KeyPairsClientXML,
@@ -295,6 +301,7 @@
try:
self.servers_client = SERVERS_CLIENTS[interface](*client_args)
+ self.network_client = NETWORKS_CLIENTS[interface](*client_args)
self.limits_client = LIMITS_CLIENTS[interface](*client_args)
if self.config.service_available.glance:
self.images_client = IMAGES_CLIENTS[interface](*client_args)
@@ -339,7 +346,6 @@
except KeyError:
msg = "Unsupported interface type `%s'" % interface
raise exceptions.InvalidConfiguration(msg)
- self.network_client = NetworkClient(*client_args)
self.hosts_client = HostsClientJSON(*client_args)
self.account_client = AccountClient(*client_args)
if self.config.service_available.glance:
diff --git a/tempest/config.py b/tempest/config.py
index 9b1a91e..e0ac843 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -369,6 +369,10 @@
cfg.BoolOpt('accounts_quotas_available',
default=True,
help="Set to True if the Account Quota middleware is enabled"),
+ cfg.StrOpt('operator_role',
+ default='Member',
+ help="Role to add to users created for swift tests to "
+ "enable creating containers"),
]
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index 7ba40e7..277eae4 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -317,6 +317,16 @@
LOG.debug("Created server: %s", server)
return server
+ def create_keypair(self, client=None, name=None):
+ if client is None:
+ client = self.compute_client
+ if name is None:
+ name = rand_name('scenario-keypair-')
+ keypair = client.keypairs.create(name)
+ self.assertEqual(keypair.name, name)
+ self.set_resource(name, keypair)
+ return keypair
+
class NetworkScenarioTest(OfficialClientTest):
"""
@@ -346,16 +356,6 @@
cls.config.identity.password,
cls.config.identity.tenant_name).tenant_id
- def _create_keypair(self, client, namestart='keypair-smoke-'):
- kp_name = rand_name(namestart)
- keypair = client.keypairs.create(kp_name)
- try:
- self.assertEqual(keypair.id, kp_name)
- self.set_resource(kp_name, keypair)
- except AttributeError:
- self.fail("Keypair object not successfully created.")
- return keypair
-
def _create_security_group(self, client, namestart='secgroup-smoke-'):
# Create security group
sg_name = rand_name(namestart)
diff --git a/tempest/scenario/test_minimum_basic.py b/tempest/scenario/test_minimum_basic.py
index 6504f40..277adba 100644
--- a/tempest/scenario/test_minimum_basic.py
+++ b/tempest/scenario/test_minimum_basic.py
@@ -83,11 +83,7 @@
properties=properties)
def nova_keypair_add(self):
- name = rand_name('scenario-keypair-')
-
- self.keypair = self.compute_client.keypairs.create(name=name)
- self.addCleanup(self.compute_client.keypairs.delete, self.keypair)
- self.assertEqual(name, self.keypair.name)
+ self.keypair = self.create_keypair()
def nova_boot(self):
create_kwargs = {'key_name': self.keypair.name}
diff --git a/tempest/scenario/test_network_basic_ops.py b/tempest/scenario/test_network_basic_ops.py
index c04ca8f..70939f6 100644
--- a/tempest/scenario/test_network_basic_ops.py
+++ b/tempest/scenario/test_network_basic_ops.py
@@ -16,8 +16,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import testtools
-
from tempest.api.network import common as net_common
from tempest.common.utils.data_utils import rand_name
from tempest import config
@@ -162,8 +160,8 @@
@attr(type='smoke')
def test_001_create_keypairs(self):
- self.keypairs[self.tenant_id] = self._create_keypair(
- self.compute_client)
+ self.keypairs[self.tenant_id] = self.create_keypair(
+ name=rand_name('keypair-smoke-'))
@attr(type='smoke')
def test_002_create_security_groups(self):
@@ -254,8 +252,6 @@
self.floating_ips[server].append(floating_ip)
@attr(type='smoke')
- @testtools.skipIf(CONF.service_available.neutron,
- "Skipped unti bug #1210664 is resolved")
def test_008_check_public_network_connectivity(self):
if not self.floating_ips:
raise self.skipTest('No floating ips have been allocated.')
diff --git a/tempest/scenario/test_server_basic_ops.py b/tempest/scenario/test_server_basic_ops.py
index c175b04..2903687 100644
--- a/tempest/scenario/test_server_basic_ops.py
+++ b/tempest/scenario/test_server_basic_ops.py
@@ -36,14 +36,8 @@
* Terminate the instance
"""
- def create_keypair(self):
- kp_name = rand_name('keypair-smoke')
- self.keypair = self.compute_client.keypairs.create(kp_name)
- try:
- self.assertEqual(self.keypair.id, kp_name)
- self.set_resource('keypair', self.keypair)
- except AttributeError:
- self.fail("Keypair object not successfully created.")
+ def add_keypair(self):
+ self.keypair = self.create_keypair()
def create_security_group(self):
sg_name = rand_name('secgroup-smoke')
@@ -62,7 +56,7 @@
def boot_instance(self):
create_kwargs = {
- 'key_name': self.get_resource('keypair').id
+ 'key_name': self.keypair.id
}
instance = self.create_server(self.compute_client,
create_kwargs=create_kwargs)
@@ -110,7 +104,7 @@
self.remove_resource('instance')
def test_server_basicops(self):
- self.create_keypair()
+ self.add_keypair()
self.create_security_group()
self.boot_instance()
self.pause_server()
diff --git a/tempest/scenario/test_snapshot_pattern.py b/tempest/scenario/test_snapshot_pattern.py
index 6b4ed47..1e090af 100644
--- a/tempest/scenario/test_snapshot_pattern.py
+++ b/tempest/scenario/test_snapshot_pattern.py
@@ -51,10 +51,7 @@
create_kwargs=create_kwargs)
def _add_keypair(self):
- name = rand_name('scenario-keypair-')
- self.keypair = self.compute_client.keypairs.create(name=name)
- self.addCleanup(self.compute_client.keypairs.delete, self.keypair)
- self.assertEqual(name, self.keypair.name)
+ self.keypair = self.create_keypair()
def _ssh_to_server(self, server_or_ip):
if isinstance(server_or_ip, basestring):
diff --git a/tempest/scenario/test_stamp_pattern.py b/tempest/scenario/test_stamp_pattern.py
index 930e7fb..0c58dea 100644
--- a/tempest/scenario/test_stamp_pattern.py
+++ b/tempest/scenario/test_stamp_pattern.py
@@ -71,10 +71,7 @@
create_kwargs=create_kwargs)
def _add_keypair(self):
- name = rand_name('scenario-keypair-')
- self.keypair = self.compute_client.keypairs.create(name=name)
- self.addCleanup(self.compute_client.keypairs.delete, self.keypair)
- self.assertEqual(name, self.keypair.name)
+ self.keypair = self.create_keypair()
def _create_floating_ip(self):
floating_ip = self.compute_client.floating_ips.create()
diff --git a/tempest/services/compute/xml/servers_client.py b/tempest/services/compute/xml/servers_client.py
index 12e7034..5c7a629 100644
--- a/tempest/services/compute/xml/servers_client.py
+++ b/tempest/services/compute/xml/servers_client.py
@@ -350,7 +350,7 @@
addrs = []
for child in node.getchildren():
addrs.append({'version': int(child.get('version')),
- 'addr': child.get('version')})
+ 'addr': child.get('addr')})
return {node.get('id'): addrs}
def list_addresses(self, server_id):
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index b19ed8d..588dc8f 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -17,7 +17,7 @@
from tempest.common.rest_client import RestClient
-class NetworkClient(RestClient):
+class NetworkClientJSON(RestClient):
"""
Tempest REST client for Neutron. Uses v2 of the Neutron API, since the
@@ -33,8 +33,8 @@
"""
def __init__(self, config, username, password, auth_url, tenant_name=None):
- super(NetworkClient, self).__init__(config, username, password,
- auth_url, tenant_name)
+ super(NetworkClientJSON, self).__init__(config, username, password,
+ auth_url, tenant_name)
self.service = self.config.network.catalog_type
self.version = '2.0'
self.uri_prefix = "v%s" % (self.version)
@@ -108,15 +108,14 @@
body = json.loads(body)
return resp, body
- def create_port(self, network_id, state=None):
- if not state:
- state = True
+ def create_port(self, network_id, **kwargs):
post_body = {
'port': {
'network_id': network_id,
- 'admin_state_up': state,
}
}
+ for key, val in kwargs.items():
+ post_body['port'][key] = val
body = json.dumps(post_body)
uri = '%s/ports' % (self.uri_prefix)
resp, body = self.post(uri, headers=self.headers, body=body)
@@ -244,7 +243,7 @@
'admin_state_up', body['router']['admin_state_up'])
# Must uncomment/modify these lines once LP question#233187 is solved
#update_body['external_gateway_info'] = kwargs.get(
- # 'external_gateway_info', body['router']['external_gateway_info'])
+ # 'external_gateway_info', body['router']['external_gateway_info'])
update_body = dict(router=update_body)
update_body = json.dumps(update_body)
resp, body = self.put(uri, update_body, self.headers)
diff --git a/tempest/services/network/xml/__init__.py b/tempest/services/network/xml/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tempest/services/network/xml/__init__.py
diff --git a/tempest/services/network/xml/network_client.py b/tempest/services/network/xml/network_client.py
new file mode 100755
index 0000000..d4fb656
--- /dev/null
+++ b/tempest/services/network/xml/network_client.py
@@ -0,0 +1,172 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from lxml import etree
+import xml.etree.ElementTree as ET
+
+from tempest.common.rest_client import RestClientXML
+from tempest.services.compute.xml.common import Document
+from tempest.services.compute.xml.common import Element
+from tempest.services.compute.xml.common import xml_to_json
+
+
+class NetworkClientXML(RestClientXML):
+
+ def __init__(self, config, username, password, auth_url, tenant_name=None):
+ super(NetworkClientXML, self).__init__(config, username, password,
+ auth_url, tenant_name)
+ self.service = self.config.network.catalog_type
+ self.version = '2.0'
+ self.uri_prefix = "v%s" % (self.version)
+
+ def list_networks(self):
+ uri = '%s/networks' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
+ networks = self._parse_array(etree.fromstring(body))
+ networks = {"networks": networks}
+ return resp, networks
+
+ def create_network(self, name):
+ uri = '%s/networks' % (self.uri_prefix)
+ post_body = Element("network")
+ p2 = Element("name", name)
+ post_body.append(p2)
+ resp, body = self.post(uri, str(Document(post_body)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def create_bulk_network(self, count, names):
+ uri = '%s/networks' % (self.uri_prefix)
+ post_body = Element("networks")
+ for i in range(count):
+ p1 = Element("network")
+ p2 = Element("name", names[i])
+ p1.append(p2)
+ post_body.append(p1)
+ resp, body = self.post(uri, str(Document(post_body)), self.headers)
+ networks = self._parse_array(etree.fromstring(body))
+ networks = {"networks": networks}
+ return resp, networks
+
+ def delete_network(self, uuid):
+ uri = '%s/networks/%s' % (self.uri_prefix, str(uuid))
+ return self.delete(uri, self.headers)
+
+ def show_network(self, uuid):
+ uri = '%s/networks/%s' % (self.uri_prefix, str(uuid))
+ resp, body = self.get(uri, self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def create_subnet(self, net_uuid, cidr):
+ uri = '%s/subnets' % (self.uri_prefix)
+ subnet = Element("subnet")
+ p2 = Element("network_id", net_uuid)
+ p3 = Element("cidr", cidr)
+ p4 = Element("ip_version", 4)
+ subnet.append(p2)
+ subnet.append(p3)
+ subnet.append(p4)
+ resp, body = self.post(uri, str(Document(subnet)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def delete_subnet(self, subnet_id):
+ uri = '%s/subnets/%s' % (self.uri_prefix, str(subnet_id))
+ return self.delete(uri, self.headers)
+
+ def list_subnets(self):
+ uri = '%s/subnets' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
+ subnets = self._parse_array(etree.fromstring(body))
+ subnets = {"subnets": subnets}
+ return resp, subnets
+
+ def show_subnet(self, uuid):
+ uri = '%s/subnets/%s' % (self.uri_prefix, str(uuid))
+ resp, body = self.get(uri, self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def create_port(self, net_uuid, **kwargs):
+ uri = '%s/ports' % (self.uri_prefix)
+ port = Element("port")
+ p1 = Element('network_id', net_uuid)
+ port.append(p1)
+ for key, val in kwargs.items():
+ key = Element(key, val)
+ port.append(key)
+ resp, body = self.post(uri, str(Document(port)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def delete_port(self, port_id):
+ uri = '%s/ports/%s' % (self.uri_prefix, str(port_id))
+ return self.delete(uri, self.headers)
+
+ def _parse_array(self, node):
+ array = []
+ for child in node.getchildren():
+ array.append(xml_to_json(child))
+ return array
+
+ def list_ports(self):
+ url = '%s/ports' % (self.uri_prefix)
+ resp, body = self.get(url, self.headers)
+ ports = self._parse_array(etree.fromstring(body))
+ ports = {"ports": ports}
+ return resp, ports
+
+ def show_port(self, port_id):
+ uri = '%s/ports/%s' % (self.uri_prefix, str(port_id))
+ resp, body = self.get(uri, self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def update_port(self, port_id, name):
+ uri = '%s/ports/%s' % (self.uri_prefix, str(port_id))
+ port = Element("port")
+ p2 = Element("name", name)
+ port.append(p2)
+ resp, body = self.put(uri, str(Document(port)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def update_subnet(self, subnet_id, name):
+ uri = '%s/subnets/%s' % (self.uri_prefix, str(subnet_id))
+ subnet = Element("subnet")
+ p2 = Element("name", name)
+ subnet.append(p2)
+ resp, body = self.put(uri, str(Document(subnet)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def update_network(self, net_id, name):
+ uri = '%s/networks/%s' % (self.uri_prefix, str(net_id))
+ network = Element("network")
+ p2 = Element("name", name)
+ network.append(p2)
+ resp, body = self.put(uri, str(Document(network)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+
+def _root_tag_fetcher_and_xml_to_json_parse(xml_returned_body):
+ body = ET.fromstring(xml_returned_body)
+ root_tag = body.tag
+ if root_tag.startswith("{"):
+ ns, root_tag = root_tag.split("}", 1)
+ body = xml_to_json(etree.fromstring(xml_returned_body))
+ body = {root_tag: body}
+ return body