Merge "Scenario manager: remove some useless `_list_*` methods"
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index e670216..e58031b 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -731,36 +731,6 @@
network['id'])
return network
- def _list_networks(self, *args, **kwargs):
- """List networks using admin creds """
- networks_list = self.admin_manager.networks_client.list_networks(
- *args, **kwargs)
- return networks_list['networks']
-
- def _list_subnets(self, *args, **kwargs):
- """List subnets using admin creds """
- subnets_list = self.admin_manager.subnets_client.list_subnets(
- *args, **kwargs)
- return subnets_list['subnets']
-
- def _list_routers(self, *args, **kwargs):
- """List routers using admin creds """
- routers_list = self.admin_manager.routers_client.list_routers(
- *args, **kwargs)
- return routers_list['routers']
-
- def _list_ports(self, *args, **kwargs):
- """List ports using admin creds """
- ports_list = self.admin_manager.ports_client.list_ports(
- *args, **kwargs)
- return ports_list['ports']
-
- def _list_agents(self, *args, **kwargs):
- """List agents using admin creds """
- agents_list = self.admin_manager.network_agents_client.list_agents(
- *args, **kwargs)
- return agents_list['agents']
-
def _create_subnet(self, network, subnets_client=None,
routers_client=None, namestart='subnet-smoke',
**kwargs):
@@ -779,7 +749,8 @@
:returns: True if subnet with cidr already exist in tenant
False else
"""
- cidr_in_use = self._list_subnets(tenant_id=tenant_id, cidr=cidr)
+ cidr_in_use = self.admin_manager.subnets_client.list_subnets(
+ tenant_id=tenant_id, cidr=cidr)['subnets']
return len(cidr_in_use) != 0
ip_version = kwargs.pop('ip_version', 4)
@@ -827,7 +798,8 @@
return subnet
def _get_server_port_id_and_ip4(self, server, ip_addr=None):
- ports = self._list_ports(device_id=server['id'], fixed_ip=ip_addr)
+ ports = self.admin_manager.ports_client.list_ports(
+ device_id=server['id'], fixed_ip=ip_addr)['ports']
# A port can have more than one IP address in some cases.
# If the network is dual-stack (IPv4 + IPv6), this port is associated
# with 2 subnets
@@ -856,7 +828,8 @@
return port_map[0]
def _get_network_by_name(self, network_name):
- net = self._list_networks(name=network_name)
+ net = self.admin_manager.networks_client.list_networks(
+ name=network_name)['networks']
self.assertNotEqual(len(net), 0,
"Unable to get network by name: %s" % network_name)
return net[0]
diff --git a/tempest/scenario/test_network_basic_ops.py b/tempest/scenario/test_network_basic_ops.py
index 4dae564..d8c8d4a 100644
--- a/tempest/scenario/test_network_basic_ops.py
+++ b/tempest/scenario/test_network_basic_ops.py
@@ -127,23 +127,23 @@
via checking the result of list_[networks,routers,subnets]
"""
- seen_nets = self._list_networks()
- seen_names = [n['name'] for n in seen_nets]
- seen_ids = [n['id'] for n in seen_nets]
+ seen_nets = self.admin_manager.networks_client.list_networks()
+ seen_names = [n['name'] for n in seen_nets['networks']]
+ seen_ids = [n['id'] for n in seen_nets['networks']]
self.assertIn(self.network['name'], seen_names)
self.assertIn(self.network['id'], seen_ids)
if self.subnet:
- seen_subnets = self._list_subnets()
- seen_net_ids = [n['network_id'] for n in seen_subnets]
- seen_subnet_ids = [n['id'] for n in seen_subnets]
+ seen_subnets = self.admin_manager.subnets_client.list_subnets()
+ seen_net_ids = [n['network_id'] for n in seen_subnets['subnets']]
+ seen_subnet_ids = [n['id'] for n in seen_subnets['subnets']]
self.assertIn(self.network['id'], seen_net_ids)
self.assertIn(self.subnet['id'], seen_subnet_ids)
if self.router:
- seen_routers = self._list_routers()
- seen_router_ids = [n['id'] for n in seen_routers]
- seen_router_names = [n['name'] for n in seen_routers]
+ seen_routers = self.admin_manager.routers_client.list_routers()
+ seen_router_ids = [n['id'] for n in seen_routers['routers']]
+ seen_router_names = [n['name'] for n in seen_routers['routers']]
self.assertIn(self.router['name'],
seen_router_names)
self.assertIn(self.router['id'],
@@ -240,7 +240,8 @@
ip_address, private_key=private_key)
old_nic_list = self._get_server_nics(ssh_client)
# get a port from a list of one item
- port_list = self._list_ports(device_id=server['id'])
+ port_list = self.admin_manager.ports_client.list_ports(
+ device_id=server['id'])['ports']
self.assertEqual(1, len(port_list))
old_port = port_list[0]
interface = self.interface_client.create_interface(
@@ -253,9 +254,12 @@
server['id'], interface['port_id'])
def check_ports():
- self.new_port_list = [port for port in
- self._list_ports(device_id=server['id'])
- if port['id'] != old_port['id']]
+ self.new_port_list = [
+ port for port in
+ self.admin_manager.ports_client.list_ports(
+ device_id=server['id'])['ports']
+ if port['id'] != old_port['id']
+ ]
return len(self.new_port_list) == 1
if not test_utils.call_until_true(
@@ -301,10 +305,13 @@
floating_ip, server = self.floating_ip_tuple
# get internal ports' ips:
# get all network ports in the new network
- internal_ips = (p['fixed_ips'][0]['ip_address'] for p in
- self._list_ports(tenant_id=server['tenant_id'],
- network_id=network['id'])
- if p['device_owner'].startswith('network'))
+ internal_ips = (
+ p['fixed_ips'][0]['ip_address'] for p in
+ self.admin_manager.ports_client.list_ports(
+ tenant_id=server['tenant_id'],
+ network_id=network['id'])['ports']
+ if p['device_owner'].startswith('network')
+ )
self._check_server_connectivity(floating_ip,
internal_ips,
@@ -320,8 +327,11 @@
# We ping the external IP from the instance using its floating IP
# which is always IPv4, so we must only test connectivity to
# external IPv4 IPs if the external network is dualstack.
- v4_subnets = [s for s in self._list_subnets(
- network_id=CONF.network.public_network_id) if s['ip_version'] == 4]
+ v4_subnets = [
+ s for s in self.admin_manager.subnets_client.list_subnets(
+ network_id=CONF.network.public_network_id)['subnets']
+ if s['ip_version'] == 4
+ ]
self.assertEqual(1, len(v4_subnets),
"Found %d IPv4 subnets" % len(v4_subnets))
@@ -624,7 +634,8 @@
self._setup_network_and_servers()
floating_ip, server = self.floating_ip_tuple
server_id = server['id']
- port_id = self._list_ports(device_id=server_id)[0]['id']
+ port_id = self.admin_manager.ports_client.list_ports(
+ device_id=server_id)['ports'][0]['id']
server_pip = server['addresses'][self.network['name']][0]['addr']
server2 = self._create_server(self.network)
@@ -677,8 +688,8 @@
'Server should have been created from a '
'pre-existing port.')
# Assert the port is bound to the server.
- port_list = self._list_ports(device_id=server['id'],
- network_id=self.network['id'])
+ port_list = self.admin_manager.ports_client.list_ports(
+ device_id=server['id'], network_id=self.network['id'])['ports']
self.assertEqual(1, len(port_list),
'There should only be one port created for '
'server %s.' % server['id'])
@@ -696,8 +707,8 @@
# Boot another server with the same port to make sure nothing was
# left around that could cause issues.
server = self._create_server(self.network, port['id'])
- port_list = self._list_ports(device_id=server['id'],
- network_id=self.network['id'])
+ port_list = self.admin_manager.ports_client.list_ports(
+ device_id=server['id'], network_id=self.network['id'])['ports']
self.assertEqual(1, len(port_list),
'There should only be one port created for '
'server %s.' % server['id'])
@@ -727,9 +738,11 @@
unschedule_router = (self.admin_manager.network_agents_client.
delete_router_from_l3_agent)
- agent_list_alive = set(a["id"] for a in
- self._list_agents(agent_type="L3 agent") if
- a["alive"] is True)
+ agent_list_alive = set(
+ a["id"] for a in
+ self.admin_manager.network_agents_client.list_agents(
+ agent_type="L3 agent")['agents'] if a["alive"] is True
+ )
self._setup_network_and_servers()
# NOTE(kevinbenton): we have to use the admin credentials to check
@@ -811,8 +824,8 @@
self._create_new_network()
self._hotplug_server()
fip, server = self.floating_ip_tuple
- new_ports = self._list_ports(device_id=server["id"],
- network_id=self.new_net["id"])
+ new_ports = self.admin_manager.ports_client.list_ports(
+ device_id=server["id"], network_id=self.new_net["id"])['ports']
spoof_port = new_ports[0]
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(fip['floating_ip_address'],
diff --git a/tempest/scenario/test_network_v6.py b/tempest/scenario/test_network_v6.py
index 2d6ea75..f33784e 100644
--- a/tempest/scenario/test_network_v6.py
+++ b/tempest/scenario/test_network_v6.py
@@ -143,9 +143,11 @@
@param ssh: RemoteClient ssh instance to server
@param sid: server uuid
"""
- ports = [p["mac_address"] for p in
- self._list_ports(device_id=sid,
- network_id=self.network_v6['id'])]
+ ports = [
+ p["mac_address"] for p in
+ self.admin_manager.ports_client.list_ports(
+ device_id=sid, network_id=self.network_v6['id'])['ports']
+ ]
self.assertEqual(1, len(ports),
message=("Multiple IPv6 ports found on network %s. "
"ports: %s")
diff --git a/tempest/scenario/test_security_groups_basic_ops.py b/tempest/scenario/test_security_groups_basic_ops.py
index 5565cb8..a01124d 100644
--- a/tempest/scenario/test_security_groups_basic_ops.py
+++ b/tempest/scenario/test_security_groups_basic_ops.py
@@ -220,22 +220,24 @@
# Checks that we see the newly created network/subnet/router via
# checking the result of list_[networks,routers,subnets]
# Check that (router, subnet) couple exist in port_list
- seen_nets = self._list_networks()
- seen_names = [n['name'] for n in seen_nets]
- seen_ids = [n['id'] for n in seen_nets]
+ seen_nets = self.admin_manager.networks_client.list_networks()
+ seen_names = [n['name'] for n in seen_nets['networks']]
+ seen_ids = [n['id'] for n in seen_nets['networks']]
self.assertIn(tenant.network['name'], seen_names)
self.assertIn(tenant.network['id'], seen_ids)
- seen_subnets = [(n['id'], n['cidr'], n['network_id'])
- for n in self._list_subnets()]
+ seen_subnets = [
+ (n['id'], n['cidr'], n['network_id']) for n in
+ self.admin_manager.subnets_client.list_subnets()['subnets']
+ ]
mysubnet = (tenant.subnet['id'], tenant.subnet['cidr'],
tenant.network['id'])
self.assertIn(mysubnet, seen_subnets)
- seen_routers = self._list_routers()
- seen_router_ids = [n['id'] for n in seen_routers]
- seen_router_names = [n['name'] for n in seen_routers]
+ seen_routers = self.admin_manager.routers_client.list_routers()
+ seen_router_ids = [n['id'] for n in seen_routers['routers']]
+ seen_router_names = [n['name'] for n in seen_routers['routers']]
self.assertIn(tenant.router['name'], seen_router_names)
self.assertIn(tenant.router['id'], seen_router_ids)
@@ -243,9 +245,11 @@
myport = (tenant.router['id'], tenant.subnet['id'])
router_ports = [
(i['device_id'], f['subnet_id'])
- for i in self._list_ports(device_id=tenant.router['id'])
+ for i in self.admin_manager.ports_client.list_ports(
+ device_id=tenant.router['id'])['ports']
if net_info.is_router_interface_port(i)
- for f in i['fixed_ips']]
+ for f in i['fixed_ips']
+ ]
self.assertIn(myport, router_ports)
@@ -450,7 +454,8 @@
mac_addr = mac_addr.strip().lower()
# Get the fixed_ips and mac_address fields of all ports. Select
# only those two columns to reduce the size of the response.
- port_list = self._list_ports(fields=['fixed_ips', 'mac_address'])
+ port_list = self.admin_manager.ports_client.list_ports(
+ fields=['fixed_ips', 'mac_address'])['ports']
port_detail_list = [
(port['fixed_ips'][0]['subnet_id'],
port['fixed_ips'][0]['ip_address'],
@@ -536,7 +541,8 @@
ip=self._get_server_ip(server),
should_succeed=False)
server_id = server['id']
- port_id = self._list_ports(device_id=server_id)[0]['id']
+ port_id = self.admin_manager.ports_client.list_ports(
+ device_id=server_id)['ports'][0]['id']
# update port with new security group and check connectivity
self.ports_client.update_port(port_id, security_groups=[
@@ -598,7 +604,8 @@
access_point_ssh = self._connect_to_access_point(new_tenant)
server_id = server['id']
- port_id = self._list_ports(device_id=server_id)[0]['id']
+ port_id = self.admin_manager.ports_client.list_ports(
+ device_id=server_id)['ports'][0]['id']
# Flip the port's port security and check connectivity
try:
@@ -642,7 +649,8 @@
sec_groups = []
server = self._create_server(name, tenant, sec_groups)
server_id = server['id']
- ports = self._list_ports(device_id=server_id)
+ ports = self.admin_manager.ports_client.list_ports(
+ device_id=server_id)['ports']
self.assertEqual(1, len(ports))
for port in ports:
self.assertEmpty(port['security_groups'],