Merge "Add base API tests for port forwarding"
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index 639fa3c..79ac4a6 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -1176,8 +1176,10 @@
def get_bare_url(self, url):
base_url = self.client.base_url
- self.assertTrue(url.startswith(base_url))
- return url[len(base_url):]
+ base_url_normalized = utils.normalize_url(base_url)
+ url_normalized = utils.normalize_url(url)
+ self.assertTrue(url_normalized.startswith(base_url_normalized))
+ return url_normalized[len(base_url_normalized):]
@classmethod
def _extract_resources(cls, body):
diff --git a/neutron_tempest_plugin/api/base_security_groups.py b/neutron_tempest_plugin/api/base_security_groups.py
index ca2c17a..952de95 100644
--- a/neutron_tempest_plugin/api/base_security_groups.py
+++ b/neutron_tempest_plugin/api/base_security_groups.py
@@ -47,8 +47,6 @@
for k, v in constants.IP_PROTOCOL_MAP.items()
if k in V4_PROTOCOL_NAMES}
-V6_PROTOCOL_LEGACY = {constants.PROTO_NAME_IPV6_ICMP_LEGACY}
-
V6_PROTOCOL_NAMES = {
'ipv6-encap',
'ipv6-frag',
@@ -60,4 +58,4 @@
V6_PROTOCOL_INTS = {v
for k, v in constants.IP_PROTOCOL_MAP.items()
- if k in (V6_PROTOCOL_NAMES | V6_PROTOCOL_LEGACY)}
+ if k in V6_PROTOCOL_NAMES}
diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py
index d44ba50..26a8c05 100644
--- a/neutron_tempest_plugin/api/test_security_groups.py
+++ b/neutron_tempest_plugin/api/test_security_groups.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import random
+
from neutron_lib import constants
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
@@ -23,7 +25,7 @@
from neutron_tempest_plugin.api import base_security_groups
-class SecGroupTest(base.BaseNetworkTest):
+class SecGroupTest(base.BaseAdminNetworkTest):
required_extensions = ['security-group']
@@ -55,6 +57,25 @@
self.assertEqual(observed_security_group['description'],
new_description)
+ @decorators.idempotent_id('1fff0d57-bb6c-4528-9c1d-2326dce1c087')
+ def test_show_security_group_contains_all_rules(self):
+ security_group = self.create_security_group()
+ protocol = random.choice(list(base_security_groups.V4_PROTOCOL_NAMES))
+ security_group_rule = self.create_security_group_rule(
+ security_group=security_group,
+ project={'id': self.admin_client.tenant_id},
+ client=self.admin_client,
+ protocol=protocol,
+ direction=constants.INGRESS_DIRECTION)
+
+ observed_security_group = self.client.show_security_group(
+ security_group['id'])['security_group']
+ observerd_security_group_rules_ids = [
+ sgr['id'] for sgr in
+ observed_security_group['security_group_rules']]
+ self.assertIn(
+ security_group_rule['id'], observerd_security_group_rules_ids)
+
@decorators.idempotent_id('7c0ecb10-b2db-11e6-9b14-000c29248b0d')
def test_create_bulk_sec_groups(self):
# Creates 2 sec-groups in one request
@@ -109,12 +130,42 @@
_ip_version = constants.IP_VERSION_6
protocol_names = base_security_groups.V6_PROTOCOL_NAMES
protocol_ints = base_security_groups.V6_PROTOCOL_INTS
- protocol_legacy_names = base_security_groups.V6_PROTOCOL_LEGACY
@decorators.idempotent_id('c7d17b41-3b4e-4add-bb3b-6af59baaaffa')
- def test_security_group_rule_protocol_legacy_names(self):
- self._test_security_group_rule_protocols(
- protocols=self.protocol_legacy_names)
+ def test_security_group_rule_protocol_legacy_icmpv6(self):
+ # These legacy protocols can be used to create security groups,
+ # but they could be shown either with their passed protocol name,
+ # or a canonical-ized version, depending on the neutron version.
+ # So we check against a list of possible values.
+ # TODO(haleyb): Remove once these legacy names are deprecated
+ protocols = {constants.PROTO_NAME_IPV6_ICMP_LEGACY:
+ constants.PROTO_NAME_IPV6_ICMP,
+ constants.PROTO_NAME_ICMP:
+ constants.PROTO_NAME_IPV6_ICMP}
+ for key, value in protocols.items():
+ self._test_security_group_rule_legacy(
+ protocol_list=[str(key), str(value)],
+ protocol=str(key),
+ direction=constants.INGRESS_DIRECTION,
+ ethertype=self.ethertype)
+
+ def _test_security_group_rule_legacy(self, protocol_list, **kwargs):
+ security_group = self.create_security_group()
+ security_group_rule = self.create_security_group_rule(
+ security_group=security_group, **kwargs)
+ observed_security_group_rule = self.client.show_security_group_rule(
+ security_group_rule['id'])['security_group_rule']
+ for key, value in kwargs.items():
+ if key == 'protocol':
+ self.assertIn(security_group_rule[key], protocol_list,
+ "{!r} does not match.".format(key))
+ self.assertIn(observed_security_group_rule[key], protocol_list,
+ "{!r} does not match.".format(key))
+ else:
+ self.assertEqual(value, security_group_rule[key],
+ "{!r} does not match.".format(key))
+ self.assertEqual(value, observed_security_group_rule[key],
+ "{!r} does not match.".format(key))
class RbacSharedSecurityGroupTest(base.BaseAdminNetworkTest):
diff --git a/neutron_tempest_plugin/common/utils.py b/neutron_tempest_plugin/common/utils.py
index 3649cb6..bd7a367 100644
--- a/neutron_tempest_plugin/common/utils.py
+++ b/neutron_tempest_plugin/common/utils.py
@@ -21,9 +21,18 @@
import functools
import threading
import time
+try:
+ import urlparse
+except ImportError:
+ from urllib import parse as urlparse
import eventlet
+SCHEMA_PORT_MAPPING = {
+ "http": 80,
+ "https": 443,
+}
+
class classproperty(object):
def __init__(self, f):
@@ -102,3 +111,15 @@
bases = (overrider_class, overriden_class)
overriden_class = type(name, bases, {})
return overriden_class
+
+
+def normalize_url(url):
+ """Normalize url without port with schema default port
+
+ """
+ parse_result = urlparse.urlparse(url)
+ (scheme, netloc, url, params, query, fragment) = parse_result
+ port = parse_result.port
+ if scheme in SCHEMA_PORT_MAPPING and not port:
+ netloc = netloc + ":" + str(SCHEMA_PORT_MAPPING[scheme])
+ return urlparse.urlunparse((scheme, netloc, url, params, query, fragment))
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index ffb4dbd..f24c82b 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -238,21 +238,24 @@
LOG.debug("Server %s disappeared(deleted) while looking "
"for the console log", server['id'])
- def _check_remote_connectivity(self, source, dest, should_succeed=True,
+ def _check_remote_connectivity(self, source, dest, count,
+ should_succeed=True,
nic=None, mtu=None, fragmentation=True,
timeout=None):
"""check ping server via source ssh connection
:param source: RemoteClient: an ssh connection from which to ping
:param dest: and IP to ping against
+ :param count: Number of ping packet(s) to send
:param should_succeed: boolean should ping succeed or not
:param nic: specific network interface to ping from
:param mtu: mtu size for the packet to be sent
:param fragmentation: Flag for packet fragmentation
+ :param timeout: Timeout for all ping packet(s) to succeed
:returns: boolean -- should_succeed == ping
:returns: ping is false if ping failed
"""
- def ping_host(source, host, count=CONF.validation.ping_count,
+ def ping_host(source, host, count,
size=CONF.validation.ping_size, nic=None, mtu=None,
fragmentation=True):
IP_VERSION_4 = neutron_lib_constants.IP_VERSION_4
@@ -275,7 +278,7 @@
def ping_remote():
try:
- result = ping_host(source, dest, nic=nic, mtu=mtu,
+ result = ping_host(source, dest, count, nic=nic, mtu=mtu,
fragmentation=fragmentation)
except lib_exc.SSHExecCommandFailed:
@@ -296,10 +299,12 @@
def check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True,
- servers=None, timeout=None):
+ servers=None, timeout=None,
+ ping_count=CONF.validation.ping_count):
try:
self.assertTrue(self._check_remote_connectivity(
- source, dest, should_succeed, nic, mtu, fragmentation,
+ source, dest, ping_count, should_succeed, nic, mtu,
+ fragmentation,
timeout=timeout))
except lib_exc.SSHTimeout as ssh_e:
LOG.debug(ssh_e)
diff --git a/neutron_tempest_plugin/scenario/test_connectivity.py b/neutron_tempest_plugin/scenario/test_connectivity.py
index 3385a04..311c263 100644
--- a/neutron_tempest_plugin/scenario/test_connectivity.py
+++ b/neutron_tempest_plugin/scenario/test_connectivity.py
@@ -109,3 +109,40 @@
self.check_remote_connectivity(
ap1_sshclient, ap2_internal_port['fixed_ips'][0]['ip_address'])
+
+ @decorators.idempotent_id('b72c3b77-3396-4144-b05d-9cd3c0099893')
+ def test_connectivity_router_east_west_traffic(self):
+ """This case is intended to test router east west taffic
+
+ The case can be used in various scenarios: legacy/distributed router,
+ same/different host.
+ """
+ net_1 = self.create_network()
+ net_2 = self.create_network()
+ subnet_1 = self.create_subnet(net_1, cidr="10.10.1.0/24")
+ subnet_2 = self.create_subnet(net_2, cidr="10.10.2.0/24")
+
+ router = self.create_router(
+ router_name=data_utils.rand_name("east_west_traffic_router"),
+ admin_state_up=True,
+ external_network_id=CONF.network.public_network_id)
+
+ internal_port_1 = self.create_port(
+ net_1, security_groups=[self.secgroup['id']])
+ internal_port_2 = self.create_port(
+ net_2, security_groups=[self.secgroup['id']])
+
+ self._create_servers(internal_port_1, internal_port_2)
+
+ self.create_router_interface(router['id'], subnet_1['id'])
+ self.create_router_interface(router['id'], subnet_2['id'])
+
+ fip = self.create_and_associate_floatingip(
+ internal_port_1['id'])
+ sshclient = ssh.Client(
+ fip['floating_ip_address'], CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'])
+
+ self.check_remote_connectivity(
+ sshclient, internal_port_2['fixed_ips'][0]['ip_address'],
+ ping_count=10)