Merge "Update the contributor information"
diff --git a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
new file mode 100644
index 0000000..604dbea
--- /dev/null
+++ b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
@@ -0,0 +1,313 @@
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import random
+
+from neutron_lib import constants
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+
+from neutron_tempest_plugin.api import base
+
+RULE_KEYWORDS_TO_CHECK = [
+ 'direction', 'remote_group_id', 'remote_address_group_id', 'description',
+ 'protocol', 'port_range_min', 'port_range_max', 'ethertype',
+ 'remote_ip_prefix', 'used_in_default_sg', 'used_in_non_default_sg'
+]
+
+
+class DefaultSecurityGroupRuleTest(base.BaseAdminNetworkTest):
+ required_extensions = ['security-groups-default-rules']
+
+ credentials = ['primary', 'admin']
+
+ @classmethod
+ def setup_clients(cls):
+ super(DefaultSecurityGroupRuleTest, cls).setup_clients()
+ cls.admin_client = cls.os_admin.network_client
+
+ def _filter_not_relevant_rule_keys(self, rule, expected_keys=None):
+ expected_keys = expected_keys or RULE_KEYWORDS_TO_CHECK
+ new_rule = {}
+ for k in rule.keys():
+ if k in expected_keys:
+ new_rule[k] = rule[k]
+ return new_rule
+
+ def _filter_not_relevant_rules_keys(self, rules, keys=None):
+ keys = keys or RULE_KEYWORDS_TO_CHECK
+ return [self._filter_not_relevant_rule_keys(r, keys) for r in rules]
+
+ def _assert_rules_exists(self, expected_rules, actual_rules):
+ for expected_rule in expected_rules:
+ self.assertIn(expected_rule, actual_rules)
+
+ @decorators.idempotent_id('2f3d3070-e9fa-4127-a33f-f1532fd89108')
+ def test_legacy_default_sg_rules_created_by_default(self):
+ expected_legacy_template_rules = [
+ {
+ 'direction': 'egress',
+ 'ethertype': 'IPv4',
+ 'remote_group_id': None,
+ 'protocol': None,
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'port_range_max': None,
+ 'port_range_min': None,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': True,
+ 'description': 'Legacy default SG rule for egress traffic'
+ }, {
+ 'remote_group_id': 'PARENT',
+ 'direction': 'ingress',
+ 'ethertype': 'IPv6',
+ 'protocol': None,
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'port_range_max': None,
+ 'port_range_min': None,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': False,
+ 'description': 'Legacy default SG rule for ingress traffic'
+ }, {
+ 'remote_group_id': 'PARENT',
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': None,
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'port_range_max': None,
+ 'port_range_min': None,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': False,
+ 'description': 'Legacy default SG rule for ingress traffic'
+ }, {
+ 'direction': 'egress',
+ 'ethertype': 'IPv6',
+ 'remote_group_id': None,
+ 'protocol': None,
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'port_range_max': None,
+ 'port_range_min': None,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': True,
+ 'description': 'Legacy default SG rule for egress traffic'
+ }
+ ]
+ sg_rules_template = (
+ self.admin_client.list_default_security_group_rules()[
+ 'default_security_group_rules'
+ ])
+ self._assert_rules_exists(
+ expected_legacy_template_rules,
+ self._filter_not_relevant_rules_keys(sg_rules_template))
+
+ @decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f')
+ def test_default_security_group_rule_lifecycle(self):
+ tcp_port = random.randint(constants.PORT_RANGE_MIN,
+ constants.PORT_RANGE_MAX)
+ rule_args = {
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'port_range_max': tcp_port,
+ 'port_range_min': tcp_port,
+ 'used_in_default_sg': False,
+ 'used_in_non_default_sg': True,
+ 'description': (
+ 'Allow tcp connections over IPv4 on port %s' % tcp_port)
+ }
+ expected_rule = {
+ 'remote_group_id': None,
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'port_range_min': tcp_port,
+ 'port_range_max': tcp_port,
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'used_in_default_sg': False,
+ 'used_in_non_default_sg': True,
+ 'description': (
+ 'Allow tcp connections over IPv4 on port %s' % tcp_port)
+ }
+ created_rule_template = self.create_default_security_group_rule(
+ **rule_args)
+ self.assertDictEqual(
+ expected_rule,
+ self._filter_not_relevant_rule_keys(created_rule_template)
+ )
+ observed_rule_template = (
+ self.admin_client.get_default_security_group_rule(
+ created_rule_template['id'])
+ )['default_security_group_rule']
+ self.assertDictEqual(
+ expected_rule,
+ self._filter_not_relevant_rule_keys(observed_rule_template)
+ )
+
+ self.admin_client.delete_default_security_group_rule(
+ created_rule_template['id']
+ )
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.admin_client.get_default_security_group_rule,
+ created_rule_template['id']
+ )
+
+ @decorators.idempotent_id('6c5a2f41-5899-47f4-9daf-4f8ddbbd3ad5')
+ def test_create_duplicate_default_security_group_rule_different_templates(
+ self):
+ tcp_port = random.randint(constants.PORT_RANGE_MIN,
+ constants.PORT_RANGE_MAX)
+ rule_args = {
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'port_range_max': tcp_port,
+ 'port_range_min': tcp_port,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': True}
+ self.create_default_security_group_rule(**rule_args)
+
+ # Now, even if 'used_in_non_default_sg' will be different error should
+ # be returned as 'used_in_default_sg' is the same
+ new_rule_args = copy.copy(rule_args)
+ new_rule_args['used_in_non_default_sg'] = False
+ self.assertRaises(
+ lib_exc.Conflict,
+ self.admin_client.create_default_security_group_rule,
+ **new_rule_args)
+
+ # Same in the opposite way: even if 'used_in_default_sg' will be
+ # different error should be returned as 'used_in_non_default_sg'
+ # is the same
+ new_rule_args = copy.copy(rule_args)
+ new_rule_args['used_in_default_sg'] = False
+ self.assertRaises(
+ lib_exc.Conflict,
+ self.admin_client.create_default_security_group_rule,
+ **new_rule_args)
+
+ @decorators.idempotent_id('e4696607-1a13-48eb-8912-ee1e742d9471')
+ def test_create_same_default_security_group_rule_for_different_templates(
+ self):
+ tcp_port = random.randint(constants.PORT_RANGE_MIN,
+ constants.PORT_RANGE_MAX)
+ expected_rules = [{
+ 'remote_group_id': None,
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'port_range_max': tcp_port,
+ 'port_range_min': tcp_port,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': False,
+ 'description': ''
+ }, {
+ 'remote_group_id': None,
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'remote_ip_prefix': None,
+ 'remote_address_group_id': None,
+ 'port_range_max': tcp_port,
+ 'port_range_min': tcp_port,
+ 'used_in_default_sg': False,
+ 'used_in_non_default_sg': True,
+ 'description': ''
+ }]
+
+ default_sg_rule_args = {
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'port_range_max': tcp_port,
+ 'port_range_min': tcp_port,
+ 'used_in_default_sg': True,
+ 'used_in_non_default_sg': False}
+ self.create_default_security_group_rule(**default_sg_rule_args)
+
+ custom_sg_rule_args = {
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'protocol': 'tcp',
+ 'port_range_max': tcp_port,
+ 'port_range_min': tcp_port,
+ 'used_in_default_sg': False,
+ 'used_in_non_default_sg': True}
+ self.create_default_security_group_rule(**custom_sg_rule_args)
+
+ sg_rules_template = (
+ self.admin_client.list_default_security_group_rules()[
+ 'default_security_group_rules'
+ ])
+ self._assert_rules_exists(
+ expected_rules,
+ self._filter_not_relevant_rules_keys(sg_rules_template))
+
+ def _validate_security_group_rules(self, sg, is_default_sg):
+ keys_to_check = [
+ 'remote_group_id', 'direction', 'ethertype', 'protocol',
+ 'remote_ip_prefix', 'remote_address_group_id', 'port_range_min',
+ 'port_range_max']
+
+ if is_default_sg:
+ sg_rules_template = (
+ self.admin_client.list_default_security_group_rules(
+ used_in_default_sg=True)['default_security_group_rules'])
+ else:
+ sg_rules_template = (
+ self.admin_client.list_default_security_group_rules(
+ used_in_non_default_sg=True
+ )['default_security_group_rules'])
+ # NOTE(slaweq): We need to replace "PARENT" keyword in
+ # the "remote_group_id" attribute of every default sg rule template
+ # with actual SG ID
+ for rule in sg_rules_template:
+ if rule['remote_group_id'] == 'PARENT':
+ rule['remote_group_id'] = sg['id']
+
+ self._assert_rules_exists(
+ self._filter_not_relevant_rules_keys(
+ sg_rules_template, keys_to_check),
+ self._filter_not_relevant_rules_keys(
+ sg['security_group_rules'], keys_to_check))
+
+ @decorators.idempotent_id('29feedb1-6f04-4a1f-a778-2fae2c7b7dc8')
+ def test_security_group_rules_created_from_default_sg_rules_template(
+ self):
+ """Test if default SG and custom new SG have got proper SG rules.
+
+ This test creates new project and checks if its default SG has SG
+ rules matching default SG rules for that kind of SG.
+ Next it creates new SG for the same project and checks if that SG also
+ have proper SG rules based on the default SG rules template.
+ """
+
+ project = self.create_project()
+ # First check rules for default SG created automatically for each
+ # project
+ default_sg = self.admin_client.list_security_groups(
+ tenant_id=project['id'], name='default')['security_groups'][0]
+ self._validate_security_group_rules(default_sg, is_default_sg=True)
+
+ # And now create different SG for same project and check SG rules for
+ # such additional SG
+ sg = self.create_security_group(project=project)
+ self._validate_security_group_rules(sg, is_default_sg=False)
diff --git a/neutron_tempest_plugin/api/admin/test_routers_dvr.py b/neutron_tempest_plugin/api/admin/test_routers_dvr.py
deleted file mode 100644
index ab25a3f..0000000
--- a/neutron_tempest_plugin/api/admin/test_routers_dvr.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2015 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.lib.common.utils import data_utils
-from tempest.lib import decorators
-
-from neutron_tempest_plugin.api import base_routers as base
-
-
-class RoutersTestDVRBase(base.BaseRouterTest):
-
- required_extensions = ['router', 'dvr']
-
- @classmethod
- def resource_setup(cls):
- # The check above will pass if api_extensions=all, which does
- # not mean DVR extension itself is present.
- # Instead, we have to check whether DVR is actually present by using
- # admin credentials to create router with distributed=True attribute
- # and checking for BadRequest exception and that the resulting router
- # has a distributed attribute.
- super(RoutersTestDVRBase, cls).resource_setup()
- name = data_utils.rand_name('pretest-check')
- router = cls.admin_client.create_router(name)
- if 'distributed' not in router['router']:
- msg = "'distributed' attribute not found. DVR Possibly not enabled"
- raise cls.skipException(msg)
- cls.admin_client.delete_router(router['router']['id'])
-
-
-class RoutersTestDVR(RoutersTestDVRBase):
-
- @decorators.idempotent_id('08a2a0a8-f1e4-4b34-8e30-e522e836c44e')
- def test_distributed_router_creation(self):
- """Test distributed router creation
-
- Test uses administrative credentials to creates a
- DVR (Distributed Virtual Routing) router using the
- distributed=True.
-
- Acceptance
- The router is created and the "distributed" attribute is
- set to True
- """
- name = data_utils.rand_name('router')
- router = self._create_admin_router(name, distributed=True)
- self.assertTrue(router['distributed'])
-
- @decorators.idempotent_id('8a0a72b4-7290-4677-afeb-b4ffe37bc352')
- def test_centralized_router_creation(self):
- """Test centralized router creation
-
- Test uses administrative credentials to creates a
- CVR (Centralized Virtual Routing) router using the
- distributed=False.
-
- Acceptance
- The router is created and the "distributed" attribute is
- set to False, thus making it a "Centralized Virtual Router"
- as opposed to a "Distributed Virtual Router"
- """
- name = data_utils.rand_name('router')
- router = self._create_admin_router(name, distributed=False)
- self.assertFalse(router['distributed'])
-
-
-class RouterTestCentralizedToDVR(RoutersTestDVRBase):
-
- required_extensions = ['l3-ha']
-
- @decorators.idempotent_id('acd43596-c1fb-439d-ada8-31ad48ae3c2e')
- def test_centralized_router_update_to_dvr(self):
- """Test centralized to DVR router update
-
- Test uses administrative credentials to creates a
- CVR (Centralized Virtual Routing) router using the
- distributed=False.Then it will "update" the router
- distributed attribute to True
-
- Acceptance
- The router is created and the "distributed" attribute is
- set to False. Once the router is updated, the distributed
- attribute will be set to True
- """
- name = data_utils.rand_name('router')
- # router needs to be in admin state down in order to be upgraded to DVR
- router = self._create_admin_router(name, distributed=False,
- ha=False, admin_state_up=False)
- self.assertFalse(router['distributed'])
- self.assertFalse(router['ha'])
- router = self.admin_client.update_router(router['id'],
- distributed=True)
- self.assertTrue(router['router']['distributed'])
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index b66fe0d..e3c9aad 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -135,6 +135,7 @@
cls.admin_subnetpools = []
cls.security_groups = []
cls.admin_security_groups = []
+ cls.sg_rule_templates = []
cls.projects = []
cls.log_objects = []
cls.reserved_subnet_cidrs = set()
@@ -243,6 +244,12 @@
security_group,
client=cls.admin_client)
+ # Clean up security group rule templates
+ for sg_rule_template in cls.sg_rule_templates:
+ cls._try_delete_resource(
+ cls.admin_client.delete_default_security_group_rule,
+ sg_rule_template['id'])
+
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
subnetpool['id'])
@@ -971,6 +978,15 @@
client.delete_security_group(security_group['id'])
@classmethod
+ def get_security_group(cls, name='default', client=None):
+ client = client or cls.client
+ security_groups = client.list_security_groups()['security_groups']
+ for security_group in security_groups:
+ if security_group['name'] == name:
+ return security_group
+ raise ValueError("No such security group named {!r}".format(name))
+
+ @classmethod
def create_security_group_rule(cls, security_group=None, project=None,
client=None, ip_version=None, **kwargs):
if project:
@@ -1006,13 +1022,11 @@
'security_group_rule']
@classmethod
- def get_security_group(cls, name='default', client=None):
- client = client or cls.client
- security_groups = client.list_security_groups()['security_groups']
- for security_group in security_groups:
- if security_group['name'] == name:
- return security_group
- raise ValueError("No such security group named {!r}".format(name))
+ def create_default_security_group_rule(cls, **kwargs):
+ body = cls.admin_client.create_default_security_group_rule(**kwargs)
+ default_sg_rule = body['default_security_group_rule']
+ cls.sg_rule_templates.append(default_sg_rule)
+ return default_sg_rule
@classmethod
def create_keypair(cls, client=None, name=None, **kwargs):
diff --git a/neutron_tempest_plugin/api/test_dhcp_ipv6.py b/neutron_tempest_plugin/api/test_dhcp_ipv6.py
deleted file mode 100644
index 4f2e576..0000000
--- a/neutron_tempest_plugin/api/test_dhcp_ipv6.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from neutron_lib import constants
-from tempest.lib import decorators
-from tempest.lib import exceptions as lib_exc
-
-from neutron_tempest_plugin.api import base
-from neutron_tempest_plugin import config
-
-CONF = config.CONF
-
-
-class NetworksTestDHCPv6(base.BaseNetworkTest):
- _ip_version = 6
-
- def setUp(self):
- super(NetworksTestDHCPv6, self).setUp()
- self.addCleanup(self._clean_network)
-
- @classmethod
- def skip_checks(cls):
- super(NetworksTestDHCPv6, cls).skip_checks()
- msg = None
- if not CONF.network_feature_enabled.ipv6:
- msg = "IPv6 is not enabled"
- elif not CONF.network_feature_enabled.ipv6_subnet_attributes:
- msg = "DHCPv6 attributes are not enabled."
- if msg:
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(NetworksTestDHCPv6, cls).resource_setup()
- cls.network = cls.create_network()
-
- def _remove_from_list_by_index(self, things_list, elem):
- for index, i in enumerate(things_list):
- if i['id'] == elem['id']:
- del things_list[index]
- return
-
- def _clean_network(self):
- body = self.client.list_ports()
- ports = body['ports']
- for port in ports:
- if (port['device_owner'].startswith(
- constants.DEVICE_OWNER_ROUTER_INTF) and
- port['device_id'] in [r['id'] for r in self.routers]):
- self.client.remove_router_interface_with_port_id(
- port['device_id'], port['id']
- )
- else:
- if port['id'] in [p['id'] for p in self.ports]:
- self.client.delete_port(port['id'])
- self._remove_from_list_by_index(self.ports, port)
- body = self.client.list_subnets()
- subnets = body['subnets']
- for subnet in subnets:
- if subnet['id'] in [s['id'] for s in self.subnets]:
- self.client.delete_subnet(subnet['id'])
- self._remove_from_list_by_index(self.subnets, subnet)
- body = self.client.list_routers()
- routers = body['routers']
- for router in routers:
- if router['id'] in [r['id'] for r in self.routers]:
- self.client.delete_router(router['id'])
- self._remove_from_list_by_index(self.routers, router)
-
- @decorators.idempotent_id('98244d88-d990-4570-91d4-6b25d70d08af')
- def test_dhcp_stateful_fixedips_outrange(self):
- """Test DHCP Stateful fixed IPs out of range
-
- When port gets IP address from fixed IP range it
- shall be checked if it's from subnets range.
- """
- kwargs = {'ipv6_ra_mode': 'dhcpv6-stateful',
- 'ipv6_address_mode': 'dhcpv6-stateful'}
- subnet = self.create_subnet(self.network, **kwargs)
- ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
- subnet["allocation_pools"][0]["end"])
- for i in range(1, 3):
- ip = netaddr.IPAddress(ip_range.last + i).format()
- self.assertRaises(lib_exc.BadRequest,
- self.create_port,
- self.network,
- fixed_ips=[{'subnet_id': subnet['id'],
- 'ip_address': ip}])
diff --git a/neutron_tempest_plugin/api/test_extra_dhcp_options.py b/neutron_tempest_plugin/api/test_extra_dhcp_options.py
deleted file mode 100644
index 91c270d..0000000
--- a/neutron_tempest_plugin/api/test_extra_dhcp_options.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron_lib import constants
-from tempest.lib.common.utils import data_utils
-from tempest.lib import decorators
-
-from neutron_tempest_plugin.api import base
-
-
-class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
- """Test Extra DHCP Options
-
- Tests the following operations with the Extra DHCP Options Neutron API
- extension:
-
- port create
- port list
- port show
- port update
-
- v2.0 of the Neutron API is assumed. It is also assumed that the Extra
- DHCP Options extension is enabled in the [network-feature-enabled]
- section of etc/tempest.conf
- """
-
- required_extensions = ['extra_dhcp_opt']
-
- @classmethod
- def resource_setup(cls):
- super(ExtraDHCPOptionsTestJSON, cls).resource_setup()
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.port = cls.create_port(cls.network)
- cls.ip_tftp = ('123.123.123.123' if cls._ip_version == 4
- else '2015::dead')
- cls.ip_server = ('123.123.123.45' if cls._ip_version == 4
- else '2015::badd')
- cls.extra_dhcp_opts = [
- {'opt_value': 'pxelinux.0',
- 'opt_name': 'bootfile-name'}, # default ip_version is 4
- {'opt_value': cls.ip_tftp,
- 'opt_name': 'tftp-server',
- 'ip_version': cls._ip_version},
- {'opt_value': cls.ip_server,
- 'opt_name': 'server-ip-address',
- 'ip_version': cls._ip_version}
- ]
-
- @decorators.idempotent_id('d2c17063-3767-4a24-be4f-a23dbfa133c9')
- def test_create_list_port_with_extra_dhcp_options(self):
- # Create a port with Extra DHCP Options
- body = self.create_port(
- self.network,
- extra_dhcp_opts=self.extra_dhcp_opts)
- port_id = body['id']
-
- # Confirm port created has Extra DHCP Options
- body = self.client.list_ports()
- ports = body['ports']
- port = [p for p in ports if p['id'] == port_id]
- self.assertTrue(port)
- self._confirm_extra_dhcp_options(port[0], self.extra_dhcp_opts)
-
- @decorators.idempotent_id('9a6aebf4-86ee-4f47-b07a-7f7232c55607')
- def test_update_show_port_with_extra_dhcp_options(self):
- # Update port with extra dhcp options
- name = data_utils.rand_name('new-port-name')
- body = self.client.update_port(
- self.port['id'],
- name=name,
- extra_dhcp_opts=self.extra_dhcp_opts)
- # Confirm extra dhcp options were added to the port
- body = self.client.show_port(self.port['id'])
- self._confirm_extra_dhcp_options(body['port'], self.extra_dhcp_opts)
-
- def _confirm_extra_dhcp_options(self, port, extra_dhcp_opts):
- retrieved = port['extra_dhcp_opts']
- self.assertEqual(len(retrieved), len(extra_dhcp_opts))
- for retrieved_option in retrieved:
- for option in extra_dhcp_opts:
- # default ip_version is 4
- ip_version = option.get('ip_version', constants.IP_VERSION_4)
- if (retrieved_option['opt_value'] == option['opt_value'] and
- retrieved_option['opt_name'] == option['opt_name'] and
- retrieved_option['ip_version'] == ip_version):
- break
- else:
- self.fail('Extra DHCP option not found in port %s' %
- str(retrieved_option))
-
-
-class ExtraDHCPOptionsIpV6TestJSON(ExtraDHCPOptionsTestJSON):
- _ip_version = 6
diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py
index 14e0c66..85e4763 100644
--- a/neutron_tempest_plugin/api/test_security_groups.py
+++ b/neutron_tempest_plugin/api/test_security_groups.py
@@ -245,6 +245,14 @@
class BaseSecGroupQuota(base.BaseAdminNetworkTest):
+ def setUp(self):
+ super().setUp()
+ # NOTE(slaweq): we don't know exactly how many rule templates may be
+ # created in the neutron db and used for every SG so, as in this test
+ # class we are checking quotas of SG, not SG rules, lets set quota for
+ # SG rules to -1
+ self._set_sg_rules_quota(-1)
+
def _create_max_allowed_sg_amount(self):
sg_amount = self._get_sg_amount()
sg_quota = self._get_sg_quota()
@@ -270,17 +278,23 @@
self.assertEqual(self._get_sg_quota(), new_sg_quota,
"Security group quota wasn't changed correctly")
- def _set_sg_quota(self, val):
- sg_quota = self._get_sg_quota()
+ def _set_quota(self, val, resource):
+ res_quota = self._get_quota(resource)
project_id = self.client.project_id
- self.admin_client.update_quotas(project_id, **{'security_group': val})
+ self.admin_client.update_quotas(project_id, **{resource: val})
self.addCleanup(self.admin_client.update_quotas,
- project_id, **{'security_group': sg_quota})
+ project_id, **{resource: res_quota})
- def _get_sg_quota(self):
+ def _get_quota(self, resource):
project_id = self.client.project_id
quotas = self.admin_client.show_quotas(project_id)
- return quotas['quota']['security_group']
+ return quotas['quota'][resource]
+
+ def _set_sg_quota(self, val):
+ return self._set_quota(val, 'security_group')
+
+ def _get_sg_quota(self):
+ return self._get_quota('security_group')
def _get_sg_amount(self):
project_id = self.client.project_id
@@ -288,6 +302,9 @@
security_groups = self.client.list_security_groups(**filter_query)
return len(security_groups['security_groups'])
+ def _set_sg_rules_quota(self, val):
+ return self._set_quota(val, 'security_group_rule')
+
class SecGroupQuotaTest(BaseSecGroupQuota):
diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py
index 3d075b4..03156c7 100644
--- a/neutron_tempest_plugin/scenario/test_security_groups.py
+++ b/neutron_tempest_plugin/scenario/test_security_groups.py
@@ -1141,14 +1141,9 @@
should_succeed=True)
-@testtools.skipIf(
- CONF.neutron_plugin_options.firewall_driver in ['openvswitch', 'None'],
- "Firewall driver other than 'openvswitch' is required to use "
- "stateless security groups.")
-class StatelessSecGroupDualStackSlaacTest(BaseNetworkSecGroupTest):
+class StatelessSecGroupDualStackBase(BaseNetworkSecGroupTest):
required_extensions = ['security-group', 'stateful-security-group']
stateless_sg = True
- ipv6_mode = 'slaac'
def _get_port_cidrs(self, port):
ips = []
@@ -1183,6 +1178,14 @@
for port_cidr in self._get_port_cidrs(port):
self.assertIn(port_cidr, configured_cidrs)
+
+@testtools.skipIf(
+ CONF.neutron_plugin_options.firewall_driver in ['openvswitch', 'None'],
+ "Firewall driver other than 'openvswitch' is required to use "
+ "stateless security groups.")
+class StatelessSecGroupDualStackSlaacTest(StatelessSecGroupDualStackBase):
+ ipv6_mode = 'slaac'
+
@decorators.idempotent_id('e7d64384-ea6a-40aa-b454-854f0990153c')
def test_default_sec_grp_scenarios(self):
self._test_default_sec_grp_scenarios()
@@ -1193,9 +1196,7 @@
"Firewall driver other than 'openvswitch' is required to use "
"stateless security groups.")
class StatelessSecGroupDualStackDHCPv6StatelessTest(
- StatelessSecGroupDualStackSlaacTest):
- required_extensions = ['security-group', 'stateful-security-group']
- stateless_sg = True
+ StatelessSecGroupDualStackBase):
ipv6_mode = 'dhcpv6-stateless'
@decorators.idempotent_id('c61c127c-e08f-4ddf-87a3-58b3c86e5476')
diff --git a/neutron_tempest_plugin/services/network/json/network_client.py b/neutron_tempest_plugin/services/network/json/network_client.py
index 0666297..d5a827e 100644
--- a/neutron_tempest_plugin/services/network/json/network_client.py
+++ b/neutron_tempest_plugin/services/network/json/network_client.py
@@ -846,6 +846,38 @@
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
+ def list_default_security_group_rules(self, **kwargs):
+ uri = '%s/default-security-group-rules' % self.uri_prefix
+ if kwargs:
+ uri += '?' + urlparse.urlencode(kwargs, doseq=1)
+ resp, body = self.get(uri)
+ self.expected_success(200, resp.status)
+ body = jsonutils.loads(body)
+ return service_client.ResponseBody(resp, body)
+
+ def get_default_security_group_rule(self, rule_id):
+ uri = '%s/default-security-group-rules/%s' % (self.uri_prefix,
+ rule_id)
+ get_resp, get_resp_body = self.get(uri)
+ self.expected_success(200, get_resp.status)
+ body = jsonutils.loads(get_resp_body)
+ return service_client.ResponseBody(get_resp, body)
+
+ def create_default_security_group_rule(self, **kwargs):
+ post_body = {'default_security_group_rule': kwargs}
+ body = jsonutils.dumps(post_body)
+ uri = '%s/default-security-group-rules' % self.uri_prefix
+ resp, body = self.post(uri, body)
+ self.expected_success(201, resp.status)
+ body = jsonutils.loads(body)
+ return service_client.ResponseBody(resp, body)
+
+ def delete_default_security_group_rule(self, rule_id):
+ uri = '%s/default-security-group-rules/%s' % (self.uri_prefix, rule_id)
+ resp, body = self.delete(uri)
+ self.expected_success(204, resp.status)
+ return service_client.ResponseBody(resp, body)
+
def list_ports(self, **kwargs):
uri = '%s/ports' % self.uri_prefix
if kwargs:
diff --git a/zuul.d/master_jobs.yaml b/zuul.d/master_jobs.yaml
index 1c47f85..938e431 100644
--- a/zuul.d/master_jobs.yaml
+++ b/zuul.d/master_jobs.yaml
@@ -114,6 +114,7 @@
- router
- router_availability_zone
- security-group
+ - security-groups-default-rules
- security-groups-remote-address-group
- segment
- service-type