Merge "Migrate neutron-vpnaas tests to neutron-tempest-plugin"
diff --git a/.zuul.yaml b/.zuul.yaml
index 13d419d..e0ecc53 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -1139,6 +1139,34 @@
tempest_concurrency: 1
tempest_test_regex: ^neutron_tempest_plugin\.neutron_dynamic_routing
+- job:
+ name: neutron-tempest-plugin-vpnaas
+ parent: neutron-tempest-plugin
+ timeout: 3900
+ required-projects:
+ - openstack/devstack-gate
+ - openstack/neutron
+ - openstack/neutron-vpnaas
+ - openstack/neutron-tempest-plugin
+ - openstack/tempest
+ vars:
+ tempest_test_regex: ^neutron_tempest_plugin\.vpnaas
+ tox_envlist: all-plugin
+ devstack_plugins:
+ neutron-vpnaas: https://opendev.org/openstack/neutron-vpnaas.git
+ neutron-tempest-plugin: https://opendev.org/openstack/neutron-tempest-plugin.git
+ network_api_extensions_common: *api_extensions_master
+ network_api_extensions_vpnaas:
+ - vpnaas
+ devstack_localrc:
+ IPSEC_PACKAGE: strongswan
+ NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_vpnaas) | join(',') }}"
+ irrelevant-files:
+ - ^.*\.rst$
+ - ^doc/.*$
+ - ^neutron_vpnaas/tests/unit/.*$
+ - ^releasenotes/.*$
+
- project-template:
name: neutron-tempest-plugin-jobs
check:
@@ -1233,6 +1261,7 @@
# TODO(slaweq): switch it to be voting when bug
# https://bugs.launchpad.net/neutron/+bug/1850626 will be fixed
voting: false
+ - neutron-tempest-plugin-vpnaas
gate:
jobs:
diff --git a/neutron_tempest_plugin/vpnaas/__init__.py b/neutron_tempest_plugin/vpnaas/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/__init__.py
diff --git a/neutron_tempest_plugin/vpnaas/api/__init__.py b/neutron_tempest_plugin/vpnaas/api/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/api/__init__.py
diff --git a/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py b/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py
new file mode 100644
index 0000000..0e54380
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py
@@ -0,0 +1,160 @@
+# Copyright 2012 OpenStack Foundation
+# Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+
+from neutron_tempest_plugin.api import base
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.vpnaas.services import clients_vpnaas
+
+CONF = config.CONF
+
+
+class BaseNetworkTest(base.BaseNetworkTest):
+ @classmethod
+ def resource_setup(cls):
+ super(BaseNetworkTest, cls).resource_setup()
+ cls.vpnservices = []
+ cls.ikepolicies = []
+ cls.ipsecpolicies = []
+ cls.ipsec_site_connections = []
+ cls.endpoint_groups = []
+
+ @classmethod
+ def get_client_manager(cls, credential_type=None, roles=None,
+ force_new=None):
+ manager = super(BaseNetworkTest, cls).get_client_manager(
+ credential_type=credential_type,
+ roles=roles,
+ force_new=force_new)
+ # Neutron uses a different clients manager than the one in the Tempest
+ return clients_vpnaas.Manager(manager.credentials)
+
+ @classmethod
+ def resource_cleanup(cls):
+ if CONF.service_available.neutron:
+ # Clean up ipsec connections
+ for ipsec_site_connection in cls.ipsec_site_connections:
+ cls._try_delete_resource(
+ cls.client.delete_ipsec_site_connection,
+ ipsec_site_connection['id'])
+
+ # Clean up ipsec endpoint group
+ for endpoint_group in cls.endpoint_groups:
+ cls._try_delete_resource(cls.client.delete_endpoint_group,
+ endpoint_group['id'])
+
+ # Clean up ipsec policies
+ for ipsecpolicy in cls.ipsecpolicies:
+ cls._try_delete_resource(cls.client.delete_ipsecpolicy,
+ ipsecpolicy['id'])
+ # Clean up ike policies
+ for ikepolicy in cls.ikepolicies:
+ cls._try_delete_resource(cls.client.delete_ikepolicy,
+ ikepolicy['id'])
+ # Clean up vpn services
+ for vpnservice in cls.vpnservices:
+ cls._try_delete_resource(cls.client.delete_vpnservice,
+ vpnservice['id'])
+ super(BaseNetworkTest, cls).resource_cleanup()
+
+ @classmethod
+ def create_vpnservice(cls, subnet_id, router_id, name=None):
+ """Wrapper utility that returns a test vpn service."""
+ if name is None:
+ name = data_utils.rand_name("vpnservice-")
+ body = cls.client.create_vpnservice(
+ subnet_id=subnet_id, router_id=router_id, admin_state_up=True,
+ name=name)
+ vpnservice = body['vpnservice']
+ cls.vpnservices.append(vpnservice)
+ return vpnservice
+
+ @classmethod
+ def create_vpnservice_no_subnet(cls, router_id):
+ """Wrapper utility that returns a test vpn service."""
+ body = cls.client.create_vpnservice(
+ router_id=router_id, admin_state_up=True,
+ name=data_utils.rand_name("vpnservice-"))
+ vpnservice = body['vpnservice']
+ cls.vpnservices.append(vpnservice)
+ return vpnservice
+
+ @classmethod
+ def create_ikepolicy(cls, name):
+ """Wrapper utility that returns a test ike policy."""
+ body = cls.client.create_ikepolicy(name=name)
+ ikepolicy = body['ikepolicy']
+ cls.ikepolicies.append(ikepolicy)
+ return ikepolicy
+
+ @classmethod
+ def create_ipsecpolicy(cls, name):
+ """Wrapper utility that returns a test ipsec policy."""
+ body = cls.client.create_ipsecpolicy(name=name)
+ ipsecpolicy = body['ipsecpolicy']
+ cls.ipsecpolicies.append(ipsecpolicy)
+ return ipsecpolicy
+
+ @classmethod
+ def create_ipsec_site_connection(cls, ikepolicy_id, ipsecpolicy_id,
+ vpnservice_id, psk="secret",
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_cidrs=None,
+ name=None):
+ """Wrapper utility that returns a test vpn connection."""
+ if peer_cidrs is None:
+ peer_cidrs = ['1.1.1.0/24', '2.2.2.0/24']
+ if name is None:
+ name = data_utils.rand_name("ipsec_site_connection-")
+ body = cls.client.create_ipsec_site_connection(
+ psk=psk,
+ initiator="bi-directional",
+ ipsecpolicy_id=ipsecpolicy_id,
+ admin_state_up=True,
+ mtu=1500,
+ ikepolicy_id=ikepolicy_id,
+ vpnservice_id=vpnservice_id,
+ peer_address=peer_address,
+ peer_id=peer_id,
+ peer_cidrs=peer_cidrs,
+ name=name)
+ ipsec_site_connection = body['ipsec_site_connection']
+ cls.ipsec_site_connections.append(ipsec_site_connection)
+ return ipsec_site_connection
+
+ @classmethod
+ def create_endpoint_group(cls, name, type, endpoints):
+ """Wrapper utility that returns a test ipsec policy."""
+ body = cls.client.create_endpoint_group(
+ endpoints=endpoints,
+ type=type,
+ description='endpoint type:' + type,
+ name=name)
+ endpoint_group = body['endpoint_group']
+ cls.endpoint_groups.append(endpoint_group)
+ return endpoint_group
+
+
+class BaseAdminNetworkTest(BaseNetworkTest):
+ credentials = ['primary', 'admin']
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseAdminNetworkTest, cls).setup_clients()
+ cls.admin_client = cls.os_admin.network_client
+ cls.identity_admin_client = cls.os_admin.tenants_client
diff --git a/neutron_tempest_plugin/vpnaas/api/test_vpnaas.py b/neutron_tempest_plugin/vpnaas/api/test_vpnaas.py
new file mode 100644
index 0000000..ab48a2f
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/api/test_vpnaas.py
@@ -0,0 +1,910 @@
+# Copyright 2012,2016 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron_lib.db import constants as db_const
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.vpnaas.api import base_vpnaas as base
+
+CONF = config.CONF
+
+_LONG_NAME = 'x' * (db_const.NAME_FIELD_SIZE + 1)
+_LONG_DESCRIPTION = 'y' * (db_const.DESCRIPTION_FIELD_SIZE + 1)
+
+
+class VPNaaSTestJSON(base.BaseAdminNetworkTest):
+
+ """VPNaaS API tests.
+
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+ List, Show, Create, Delete, and Update VPN Service
+ List, Show, Create, Delete, and Update IKE policy
+ List, Show, Create, Delete, and Update IPSec policy
+ """
+
+ @classmethod
+ def resource_setup(cls):
+ if not test.is_extension_enabled('vpnaas', 'network'):
+ msg = "vpnaas extension not enabled."
+ raise cls.skipException(msg)
+ super(VPNaaSTestJSON, cls).resource_setup()
+ cls.ext_net_id = CONF.network.public_network_id
+ network_name = data_utils.rand_name('network-')
+ cls.network = cls.create_network(network_name)
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.router = cls.create_router(
+ data_utils.rand_name("router"),
+ external_network_id=CONF.network.public_network_id)
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+ cls.vpnservice = cls.create_vpnservice(cls.subnet['id'],
+ cls.router['id'])
+ vpnservice2 = cls.create_vpnservice_no_subnet(cls.router['id'])
+ cls.vpnservice_no_subnet = vpnservice2
+
+ cls.ikepolicy = cls.create_ikepolicy(
+ data_utils.rand_name("ike-policy-"))
+ cls.ipsecpolicy = cls.create_ipsecpolicy(
+ data_utils.rand_name("ipsec-policy-"))
+
+ cls.endpoint_group_local = cls.create_endpoint_group(
+ data_utils.rand_name("endpoint-group-local-"),
+ 'subnet',
+ cls.subnet['id'])
+
+ cls.endpoint_group_remote = cls.create_endpoint_group(
+ data_utils.rand_name("endpoint-group-remote-"),
+ 'cidr',
+ ["10.101.0.0/24", "10.102.0.0/24"])
+
+ cls.ipsec_site_connection = cls.create_ipsec_site_connection(
+ cls.ikepolicy['id'],
+ cls.ipsecpolicy['id'],
+ cls.vpnservice['id'])
+
+ def _delete_ike_policy(self, ike_policy_id):
+ # Deletes a ike policy and verifies if it is deleted or not
+ ike_list = list()
+ all_ike = self.client.list_ikepolicies()
+ for ike in all_ike['ikepolicies']:
+ ike_list.append(ike['id'])
+ if ike_policy_id in ike_list:
+ self.client.delete_ikepolicy(ike_policy_id)
+ # Asserting that the policy is not found in list after deletion
+ ikepolicies = self.client.list_ikepolicies()
+ ike_id_list = list()
+ for i in ikepolicies['ikepolicies']:
+ ike_id_list.append(i['id'])
+ self.assertNotIn(ike_policy_id, ike_id_list)
+
+ def _delete_ipsec_policy(self, ipsec_policy_id):
+ # Deletes an ike policy if it exists
+ try:
+ self.client.delete_ipsecpolicy(ipsec_policy_id)
+
+ except lib_exc.NotFound:
+ pass
+
+ def _delete_ipsec_site_connection(self, conn_id):
+ # Deletes an ipsec site connection if it exists
+ try:
+ self.client.delete_ipsec_site_connection(conn_id)
+ except lib_exc.NotFound:
+ pass
+
+ def _assertExpected(self, expected, actual):
+ # Check if not expected keys/values exists in actual response body
+ for key, value in expected.items():
+ self.assertIn(key, actual)
+ self.assertEqual(value, actual[key])
+
+ def _delete_vpn_service(self, vpn_service_id):
+ self.client.delete_vpnservice(vpn_service_id)
+ # Asserting if vpn service is found in the list after deletion
+ body = self.client.list_vpnservices()
+ vpn_services = [vs['id'] for vs in body['vpnservices']]
+ self.assertNotIn(vpn_service_id, vpn_services)
+
+ def _delete_endpoint_group(self, endpoint_group_id):
+ # Delete a endpoint-group and verifies if it is deleted or not
+ endpoint_group_list = list()
+ all_endpoint = self.client.list_endpoint_groups()
+ for endpoint in all_endpoint['endpoint_groups']:
+ endpoint_group_list.append(endpoint['id'])
+ if endpoint_group_id in endpoint_group_list:
+ self.client.delete_endpoint_group(endpoint_group_id)
+ # Asserting that the endpoint is not found in list after deletion
+ endpoint_group = self.client.list_endpoint_groups()
+ for e in endpoint_group['endpoint_groups']:
+ endpoint_group_list.append(e['id'])
+ self.assertNotIn(endpoint_group_list, endpoint_group_id)
+
+ def _get_tenant_id(self):
+ """Returns the tenant_id of the client current user"""
+ return self.client.tenant_id
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('74dcf2d3-a40e-4a6c-a25a-747d764bee81')
+ def test_admin_create_ipsec_policy_for_tenant(self):
+ tenant_id = self._get_tenant_id()
+ # Create IPSec policy for the newly created tenant
+ name = data_utils.rand_name('ipsec-policy')
+ body = (self.admin_client.
+ create_ipsecpolicy(name=name, tenant_id=tenant_id))
+ ipsecpolicy = body['ipsecpolicy']
+ self.assertIsNotNone(ipsecpolicy['id'])
+ self.addCleanup(self.admin_client.delete_ipsecpolicy,
+ ipsecpolicy['id'])
+
+ # Assert that created ipsec policy is found in API list call
+ body = self.client.list_ipsecpolicies()
+ ipsecpolicies = [policy['id'] for policy in body['ipsecpolicies']]
+ self.assertIn(ipsecpolicy['id'], ipsecpolicies)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('016b1861-fe55-4184-ba3c-e049ebbeb570')
+ def test_admin_create_vpn_service_for_tenant(self):
+ tenant_id = self._get_tenant_id()
+
+ # Create vpn service for the newly created tenant
+ network2 = self.create_network()
+ subnet2 = self.create_subnet(network2)
+ router2 = self.create_router(data_utils.rand_name('router-'),
+ external_network_id=self.ext_net_id)
+ self.create_router_interface(router2['id'], subnet2['id'])
+ name = data_utils.rand_name('vpn-service')
+ body = self.admin_client.create_vpnservice(
+ subnet_id=subnet2['id'],
+ router_id=router2['id'],
+ name=name,
+ admin_state_up=True,
+ tenant_id=tenant_id)
+ vpnservice = body['vpnservice']
+ self.assertIsNotNone(vpnservice['id'])
+ self.addCleanup(self.admin_client.delete_vpnservice, vpnservice['id'])
+ # Assert that created vpnservice is found in API list call
+ body = self.client.list_vpnservices()
+ vpn_services = [vs['id'] for vs in body['vpnservices']]
+ self.assertIn(vpnservice['id'], vpn_services)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('8f33c292-558d-4fdb-b32c-ab1677e8bdc8')
+ def test_admin_create_ike_policy_for_tenant(self):
+ tenant_id = self._get_tenant_id()
+
+ # Create IKE policy for the newly created tenant
+ name = data_utils.rand_name('ike-policy')
+ body = (self.admin_client.
+ create_ikepolicy(name=name, ike_version="v1",
+ encryption_algorithm="aes-128",
+ auth_algorithm="sha1",
+ tenant_id=tenant_id))
+ ikepolicy = body['ikepolicy']
+ self.assertIsNotNone(ikepolicy['id'])
+ self.addCleanup(self.admin_client.delete_ikepolicy, ikepolicy['id'])
+
+ # Assert that created ike policy is found in API list call
+ body = self.client.list_ikepolicies()
+ ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
+ self.assertIn(ikepolicy['id'], ikepolicies)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('2997641b-3f2f-4fdf-9af6-bfb9997ecd3b')
+ def test_list_vpn_services(self):
+ # Verify the VPN service exists in the list of all VPN services
+ body = self.client.list_vpnservices()
+ vpnservices = body['vpnservices']
+ self.assertIn(self.vpnservice['id'], [v['id'] for v in vpnservices])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('8780a28a-deb2-40b0-8433-66f37005c058')
+ def test_create_update_delete_vpn_service(self):
+ # Creates a VPN service and sets up deletion
+ network1 = self.create_network()
+ subnet1 = self.create_subnet(network1)
+ router1 = self.create_router(data_utils.rand_name('router-'),
+ external_network_id=self.ext_net_id)
+ self.create_router_interface(router1['id'], subnet1['id'])
+ name = data_utils.rand_name('vpn-service1')
+ body = self.client.create_vpnservice(subnet_id=subnet1['id'],
+ router_id=router1['id'],
+ name=name,
+ admin_state_up=True)
+ vpnservice = body['vpnservice']
+ self.addCleanup(self._delete_vpn_service, vpnservice['id'])
+ # Assert if created vpnservices are not found in vpnservices list
+ body = self.client.list_vpnservices()
+ vpn_services = [vs['id'] for vs in body['vpnservices']]
+ self.assertIsNotNone(vpnservice['id'])
+ self.assertIn(vpnservice['id'], vpn_services)
+
+ # TODO(raies): implement logic to update vpnservice
+ # VPNaaS client function to update is implemented.
+ # But precondition is that current state of vpnservice
+ # should be "ACTIVE" not "PENDING*"
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('631d33ec-8d34-49e1-9e53-576959ea2c57')
+ def test_show_vpn_service(self):
+ # Verifies the details of a vpn service
+ body = self.client.show_vpnservice(self.vpnservice['id'])
+ vpnservice = body['vpnservice']
+ self.assertEqual(self.vpnservice['id'], vpnservice['id'])
+ self.assertEqual(self.vpnservice['name'], vpnservice['name'])
+ self.assertEqual(self.vpnservice['description'],
+ vpnservice['description'])
+ self.assertEqual(self.vpnservice['router_id'], vpnservice['router_id'])
+ self.assertEqual(self.vpnservice['subnet_id'], vpnservice['subnet_id'])
+ self.assertEqual(self.vpnservice['tenant_id'], vpnservice['tenant_id'])
+ valid_status = ["ACTIVE", "DOWN", "BUILD", "ERROR", "PENDING_CREATE",
+ "PENDING_UPDATE", "PENDING_DELETE"]
+ self.assertIn(vpnservice['status'], valid_status)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('b6a665cf-d3df-417f-9477-bd0ef2c56c36')
+ def test_list_ike_policies(self):
+ # Verify the ike policy exists in the list of all IKE policies
+ body = self.client.list_ikepolicies()
+ ikepolicies = body['ikepolicies']
+ self.assertIn(self.ikepolicy['id'], [i['id'] for i in ikepolicies])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('d9ecb858-7136-4ef5-abba-325c84c57d78')
+ def test_create_update_delete_ike_policy(self):
+ # Creates a IKE policy
+ name = data_utils.rand_name('ike-policy')
+ body = (self.client.create_ikepolicy(
+ name=name,
+ ike_version="v1",
+ encryption_algorithm="aes-128",
+ auth_algorithm="sha1"))
+ ikepolicy = body['ikepolicy']
+ self.assertIsNotNone(ikepolicy['id'])
+ self.addCleanup(self._delete_ike_policy, ikepolicy['id'])
+
+ # Update IKE Policy
+ new_ike = {'name': data_utils.rand_name("New-IKE"),
+ 'description': "Updated ike policy",
+ 'encryption_algorithm': "aes-256",
+ 'ike_version': "v2",
+ 'pfs': "group14",
+ 'lifetime': {'units': "seconds", 'value': 2000}}
+ self.client.update_ikepolicy(ikepolicy['id'], **new_ike)
+ # Confirm that update was successful by verifying using 'show'
+ body = self.client.show_ikepolicy(ikepolicy['id'])
+ ike_policy = body['ikepolicy']
+ for key, value in new_ike.items():
+ self.assertIn(key, ike_policy)
+ self.assertEqual(value, ike_policy[key])
+
+ # Verification of ike policy delete
+ self.client.delete_ikepolicy(ikepolicy['id'])
+ body = self.client.list_ikepolicies()
+ ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
+ self.assertNotIn(ike_policy['id'], ikepolicies)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('5efd625b-41f2-4b80-90e0-81786f3e3309')
+ def test_show_ike_policy(self):
+ # Verifies the details of a ike policy
+ body = self.client.show_ikepolicy(self.ikepolicy['id'])
+ ikepolicy = body['ikepolicy']
+ self.assertEqual(self.ikepolicy['id'], ikepolicy['id'])
+ self.assertEqual(self.ikepolicy['name'], ikepolicy['name'])
+ self.assertEqual(self.ikepolicy['description'],
+ ikepolicy['description'])
+ self.assertEqual(self.ikepolicy['encryption_algorithm'],
+ ikepolicy['encryption_algorithm'])
+ self.assertEqual(self.ikepolicy['auth_algorithm'],
+ ikepolicy['auth_algorithm'])
+ self.assertEqual(self.ikepolicy['tenant_id'],
+ ikepolicy['tenant_id'])
+ self.assertEqual(self.ikepolicy['pfs'],
+ ikepolicy['pfs'])
+ self.assertEqual(self.ikepolicy['phase1_negotiation_mode'],
+ ikepolicy['phase1_negotiation_mode'])
+ self.assertEqual(self.ikepolicy['ike_version'],
+ ikepolicy['ike_version'])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('c3f7a459-3ccb-42b3-8fab-5ad7a5a791b1')
+ def test_list_ipsec_policies(self):
+ # Verify the ipsec policy exists in the list of all ipsec policies
+ body = self.client.list_ipsecpolicies()
+ ipsecpolicies = body['ipsecpolicies']
+ self.assertIn(self.ipsecpolicy['id'], [i['id'] for i in ipsecpolicies])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('e3952941-9b9e-48fc-9f36-fc41707a16b4')
+ def test_create_update_delete_ipsec_policy(self):
+ # Creates an ipsec policy
+ ipsec_policy_body = {'name': data_utils.rand_name('ipsec-policy'),
+ 'pfs': 'group5',
+ 'encryption_algorithm': "aes-128",
+ 'auth_algorithm': 'sha1'}
+ resp_body = self.client.create_ipsecpolicy(**ipsec_policy_body)
+ ipsecpolicy = resp_body['ipsecpolicy']
+ self.addCleanup(self._delete_ipsec_policy, ipsecpolicy['id'])
+ self._assertExpected(ipsec_policy_body, ipsecpolicy)
+ # Verification of ipsec policy update
+ new_ipsec = {'description': 'Updated ipsec policy',
+ 'pfs': 'group2',
+ 'name': data_utils.rand_name("New-IPSec"),
+ 'encryption_algorithm': "aes-256",
+ 'lifetime': {'units': "seconds", 'value': '2000'}}
+ body = self.client.update_ipsecpolicy(ipsecpolicy['id'],
+ **new_ipsec)
+ updated_ipsec_policy = body['ipsecpolicy']
+ self._assertExpected(new_ipsec, updated_ipsec_policy)
+ # Verification of ipsec policy delete
+ self.client.delete_ipsecpolicy(ipsecpolicy['id'])
+ self.assertRaises(lib_exc.NotFound,
+ self.client.delete_ipsecpolicy, ipsecpolicy['id'])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('86d68b8d-5935-46db-b096-235f70825678')
+ def test_show_ipsec_policy(self):
+ # Verifies the details of an ipsec policy
+ body = self.client.show_ipsecpolicy(self.ipsecpolicy['id'])
+ ipsecpolicy = body['ipsecpolicy']
+ self._assertExpected(self.ipsecpolicy, ipsecpolicy)
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('95a4a55a-3a10-4f53-9e8a-45be359a245e')
+ def test_create_vpnservice_long_name(self):
+ """Test excessively long name.
+
+ Without REST checks, this call would return 500 INTERNAL SERVER
+ error on internal db failure instead.
+ """
+ name = _LONG_NAME
+ self.assertRaises(
+ lib_exc.BadRequest, self.client.create_vpnservice,
+ subnet_id=self.subnet['id'], router_id=self.router['id'],
+ name=name, admin_state_up=True)
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('6e88b4fb-de5e-464e-97db-a04c68070589')
+ def test_create_vpnservice_long_description(self):
+ name = data_utils.rand_name('vpn-service1')
+ description = _LONG_DESCRIPTION
+ self.assertRaises(
+ lib_exc.BadRequest, self.client.create_vpnservice,
+ subnet_id=self.subnet['id'], router_id=self.router['id'],
+ name=name, description=description, admin_state_up=True)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('fdb5a215-0671-449b-91e4-dd58fb223947')
+ def test_list_vpn_connections(self):
+ # Verify the VPN service exists in the list of all VPN services
+ body = self.client.list_ipsec_site_connections()
+ ipsec_site_connections = body['ipsec_site_connections']
+ self.assertIn(self.ipsec_site_connection['id'],
+ [v['id'] for v in ipsec_site_connections])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('92cfbb15-4299-4fe9-a4e1-f11167e3b057')
+ def test_create_delete_vpn_connection_with_legacy_mode(self):
+ # Verify create VPN connection
+ name = data_utils.rand_name("ipsec_site_connection-")
+ body = self.client.create_ipsec_site_connection(
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_cidrs=['10.1.1.0/24', '10.2.2.0/24'],
+ name=name,
+ mtu=1500,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+ ipsec_site_connection = body['ipsec_site_connection']
+ self.assertEqual(ipsec_site_connection['name'], name)
+ self.assertEqual(ipsec_site_connection['mtu'], 1500)
+ self.addCleanup(self._delete_ipsec_site_connection,
+ ipsec_site_connection['id'])
+
+ # Verification of IPsec connection delete
+ self.client.delete_ipsec_site_connection(ipsec_site_connection['id'])
+ body = self.client.list_ipsec_site_connections()
+ ipsec_site_connections = body['ipsec_site_connections']
+ self.assertNotIn(ipsec_site_connection['id'],
+ [v['id'] for v in ipsec_site_connections])
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('e9977b38-9cd8-4aa6-8bd5-ba15f41c368c')
+ def test_create_vpn_connection_missing_peer_cidr(self):
+ # Verify create VPN connection with JSON missing peer cidr
+ # in legacy mode
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ name=name,
+ mtu=1500,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('06a268fa-c53b-4cd1-9350-b83892e4a394')
+ def test_create_vpn_service_subnet_not_on_router(self):
+ # Verify create VPN service with a subnet not on router
+ tenant_id = self._get_tenant_id()
+
+ # Create vpn service for the newly created tenant
+ network2 = self.create_network()
+ subnet2 = self.create_subnet(network2)
+ router2 = self.create_router(data_utils.rand_name('router-'),
+ external_network_id=self.ext_net_id)
+ self.addCleanup(self.admin_client.delete_router, router2['id'])
+ self.addCleanup(self.admin_client.delete_network, network2['id'])
+ name = data_utils.rand_name('vpn-service')
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.admin_client.create_vpnservice,
+ subnet_id=subnet2['id'],
+ router_id=router2['id'],
+ name=name,
+ admin_state_up=True,
+ tenant_id=tenant_id)
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('7678798a-fc20-46cd-ad78-b6b3d599de18')
+ def test_create_vpn_connection_small_MTU(self):
+ # Verify create VPN connection with small MTU
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_cidrs=['10.1.1.0/24', '10.2.2.0/24'],
+ name=name,
+ mtu=63,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('d9e1af94-1fbf-4183-b857-82328d4f4b97')
+ def test_create_vpn_connection_small_dpd(self):
+ # Verify create VPN connection with small dpd
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_cidrs=['10.1.1.0/24', '10.2.2.0/24'],
+ name=name,
+ dpd=59,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('ea6ae270-9ea4-4446-9022-b7ef7d986762')
+ def test_create_vpn_connection_wrong_peer_cidr(self):
+ # Verify create VPN connection with wrong peer cidr
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_cidrs=['1.0.0.0/33'],
+ name=name,
+ mtu=1500,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('68e17a9b-5b33-4d3e-90f2-01a6728d2c26')
+ def test_create_connection_with_cidr_and_endpoint_group(self):
+ tenant_id = self._get_tenant_id()
+ # Create endpoint group for the newly created tenant
+ name = data_utils.rand_name('endpoint_group')
+ subnet_id = self.subnet['id']
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='subnet',
+ endpoints=subnet_id)
+ endpoint_group_local = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group,
+ endpoint_group_local['id'])
+ name = data_utils.rand_name('endpoint_group')
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='cidr',
+ endpoints=["10.103.0.0/24", "10.104.0.0/24"])
+ endpoint_group_remote = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group,
+ endpoint_group_remote['id'])
+ # Create connections
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice_no_subnet['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_cidr="10.1.0.0/24",
+ peer_ep_group_id=endpoint_group_local['id'],
+ local_ep_group_id=endpoint_group_remote['id'],
+ name=name,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('d101a6a7-67d3-4418-9d9e-65c4507a9148')
+ def test_create_vpn_connection_with_missing_remote_endpoint_group(self):
+ # Verify create VPN connection without subnet in vpnservice
+ # and has only local endpoint group
+ tenant_id = self._get_tenant_id()
+ # Create endpoint group for the newly created tenant
+ tenant_id = self._get_tenant_id()
+ name = data_utils.rand_name('endpoint_group')
+ subnet_id = self.subnet['id']
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='subnet',
+ endpoints=subnet_id)
+ endpoint_group = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
+ # Create connections
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice_no_subnet['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ local_ep_group_id=endpoint_group['id'],
+ name=name,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('b5dd8d1e-587b-4e26-84f0-dcb814c65e57')
+ def test_create_vpn_connection_with_missing_local_endpoint_group(self):
+ # Verify create VPN connection without subnet in vpnservice
+ # and only have only local endpoint group
+ tenant_id = self._get_tenant_id()
+ # Create endpoint group for the newly created tenant
+ tenant_id = self._get_tenant_id()
+ name = data_utils.rand_name('endpoint_group')
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='cidr',
+ endpoints=["10.101.0.0/24", "10.102.0.0/24"])
+ endpoint_group = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
+ # Create connections
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice_no_subnet['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_ep_group_id=endpoint_group['id'],
+ name=name,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('427bbc1b-4040-42e4-b661-6395a0bd8762')
+ def test_create_connection_with_mix_ip_endpoint_group(self):
+ tenant_id = self._get_tenant_id()
+ # Create endpoint group for the newly created tenant
+ name = data_utils.rand_name('endpoint_group')
+ subnet_id = self.subnet['id']
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='subnet',
+ endpoints=subnet_id)
+ endpoint_group_local = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group,
+ endpoint_group_local['id'])
+ name_v6 = data_utils.rand_name('endpoint_group')
+ body_v6 = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name_v6,
+ type='cidr',
+ endpoints=["fec0:101::/64", "fec0:102::/64"])
+ endpoint_group_remote = body_v6['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group,
+ endpoint_group_remote['id'])
+ # Create connections
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertEqual(endpoint_group_local['type'], 'subnet')
+ self.assertEqual(endpoint_group_remote['type'], 'cidr')
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice_no_subnet['id'],
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ peer_ep_group_id=endpoint_group_local['id'],
+ local_ep_group_id=endpoint_group_remote['id'],
+ name=name,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('eac56769-ec2d-4817-ba45-ca101d676bfc')
+ def test_create_connection_with_subnet_and_remote_endpoint_group(self):
+ tenant_id = self._get_tenant_id()
+ # Create endpoint group for the newly created tenant
+ name = data_utils.rand_name('endpoint_group')
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='cidr',
+ endpoints=["10.101.0.0/24", "10.102.0.0/24"])
+ endpoint_group = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
+ # Create connections
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ peer_ep_group_id=endpoint_group['id'],
+ name=name,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('92cd1221-3c7c-47ea-adad-8d8c2fc0f247')
+ def test_create_connection_with_subnet_and_local_endpoint_group(self):
+ tenant_id = self._get_tenant_id()
+ # Create endpoint group for the newly created tenant
+ name = data_utils.rand_name('endpoint_group')
+ subnet_id = self.subnet['id']
+ body = self.client.create_endpoint_group(
+ tenant_id=tenant_id,
+ name=name,
+ type='subnet',
+ endpoints=subnet_id)
+ endpoint_group = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
+ # Create connections
+ name = data_utils.rand_name("ipsec_site_connection-")
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_ipsec_site_connection,
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice['id'],
+ peer_address="172.24.4.233",
+ local_ep_group_id=endpoint_group['id'],
+ name=name,
+ admin_state_up=True,
+ initiator="bi-directional",
+ psk="secret")
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('e0cb784d-ebd9-4ad1-84e1-0cba915784d0')
+ def test_create_update_delete_endpoint_group(self):
+ # Creates a endpoint-group
+ name = data_utils.rand_name('endpoint_group')
+ body = (self.client.create_endpoint_group(
+ name=name,
+ type='cidr',
+ endpoints=["10.2.0.0/24", "10.3.0.0/24"]))
+ endpoint_group = body['endpoint_group']
+ self.assertIsNotNone(endpoint_group['id'])
+ self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
+ # Update endpoint-group
+ body = {'name': data_utils.rand_name("new_endpoint_group")}
+ self.client.update_endpoint_group(endpoint_group['id'],
+ name=name)
+ # Confirm that update was successful by verifying using 'show'
+ body = self.client.show_endpoint_group(endpoint_group['id'])
+ endpoint_group = body['endpoint_group']
+ self.assertEqual(name, endpoint_group['name'])
+ # Verification of endpoint-group delete
+ endpoint_group_id = endpoint_group['id']
+ self.client.delete_endpoint_group(endpoint_group['id'])
+ body = self.client.list_endpoint_groups()
+ endpoint_group = [enp['id'] for enp in body['endpoint_groups']]
+ self.assertNotIn(endpoint_group_id, endpoint_group)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('69ee29e9-f72c-4d84-b6ab-c3f5576440fc')
+ def test_admin_create_endpoint_group_for_tenant(self):
+ # Create endpoint group for the newly created tenant
+ tenant_id = self._get_tenant_id()
+ name = data_utils.rand_name('endpoint_group')
+ body = (self.client.
+ create_endpoint_group(
+ name=name,
+ type='cidr',
+ endpoints=["10.2.0.0/24", "10.3.0.0/24"],
+ tenant_id=tenant_id))
+ endpoint_group = body['endpoint_group']
+ self.assertIsNotNone(endpoint_group['id'])
+ self.addCleanup(self._delete_endpoint_group, endpoint_group['id'])
+ # Assert that created endpoint group is found in API list call
+ endpoint_group_id = endpoint_group['id']
+ self.client.delete_endpoint_group(endpoint_group['id'])
+ body = self.client.list_endpoint_groups()
+ endpoint_group = [enp['id'] for enp in body['endpoint_groups']]
+ self.assertNotIn(endpoint_group_id, endpoint_group)
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('7a584d9c-0f64-4d39-a66c-261bfa46e369')
+ def test_show_endpoint_group(self):
+ # Verifies the details of an endpoint group
+ body = self.client.show_endpoint_group(self.endpoint_group_local['id'])
+ endpoint_group = body['endpoint_group']
+ self.assertEqual(self.endpoint_group_local['id'], endpoint_group['id'])
+ self.assertEqual(self.endpoint_group_local['name'],
+ endpoint_group['name'])
+ self.assertEqual(self.endpoint_group_local['description'],
+ endpoint_group['description'])
+ self.assertEqual(self.endpoint_group_local['tenant_id'],
+ endpoint_group['tenant_id'])
+ self.assertEqual(self.endpoint_group_local['type'],
+ endpoint_group['type'])
+ self.assertEqual(self.endpoint_group_local['endpoints'],
+ endpoint_group['endpoints'])
+ # Verifies the details of an endpoint group
+ body = self.client.show_endpoint_group(
+ self.endpoint_group_remote['id'])
+ endpoint_group = body['endpoint_group']
+ # endpoint_group_remote = endpoint_group['id']
+ self.assertEqual(self.endpoint_group_remote['id'],
+ endpoint_group['id'])
+ self.assertEqual(self.endpoint_group_remote['name'],
+ endpoint_group['name'])
+ self.assertEqual(self.endpoint_group_remote['description'],
+ endpoint_group['description'])
+ self.assertEqual(self.endpoint_group_remote['tenant_id'],
+ endpoint_group['tenant_id'])
+ self.assertEqual(self.endpoint_group_remote['type'],
+ endpoint_group['type'])
+ self.assertEqual(self.endpoint_group_remote['endpoints'],
+ endpoint_group['endpoints'])
+
+ @decorators.attr(type='smoke')
+ @decorators.idempotent_id('386f703e-445e-4208-abd2-df66066fd876')
+ def test_create_delete_vpn_connection_with_ep_group(self):
+ # Creates a endpoint-group with type cidr
+ name = data_utils.rand_name('endpoint_group')
+ body = self.client.create_endpoint_group(
+ name=name,
+ type='cidr',
+ endpoints=["10.2.0.0/24", "10.3.0.0/24"])
+ endpoint_group_remote = body['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group,
+ endpoint_group_remote['id'])
+ # Creates a endpoint-group with type subnet
+ name = data_utils.rand_name('endpoint_group')
+ subnet_id = self.subnet['id']
+ body2 = self.client.create_endpoint_group(
+ name=name,
+ type='subnet',
+ endpoints=subnet_id)
+ endpoint_group_local = body2['endpoint_group']
+ self.addCleanup(self._delete_endpoint_group,
+ endpoint_group_local['id'])
+ # Verify create VPN connection
+ name = data_utils.rand_name("ipsec_site_connection-")
+ body = self.client.create_ipsec_site_connection(
+ ipsecpolicy_id=self.ipsecpolicy['id'],
+ ikepolicy_id=self.ikepolicy['id'],
+ vpnservice_id=self.vpnservice_no_subnet['id'],
+ peer_ep_group_id=endpoint_group_remote['id'],
+ local_ep_group_id=endpoint_group_local['id'],
+ name=name,
+ mtu=1500,
+ admin_state_up=True,
+ initiator="bi-directional",
+ peer_address="172.24.4.233",
+ peer_id="172.24.4.233",
+ psk="secret")
+ ipsec_site_connection = body['ipsec_site_connection']
+ self.assertEqual(ipsec_site_connection['name'], name)
+ self.assertEqual(ipsec_site_connection['mtu'], 1500)
+ self.addCleanup(self._delete_ipsec_site_connection,
+ ipsec_site_connection['id'])
+
+ # Verification of IPsec connection delete
+ self.client.delete_ipsec_site_connection(ipsec_site_connection['id'])
+ body = self.client.list_ipsec_site_connections()
+ ipsec_site_connections = body['ipsec_site_connections']
+ self.assertNotIn(ipsec_site_connection['id'],
+ [v['id'] for v in ipsec_site_connections])
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('0ef1b2b0-9b4b-49f1-9473-cb4a0b625543')
+ def test_fail_create_endpoint_group_when_wrong_type(self):
+ # Creates a endpoint-group with wrong type
+ name = data_utils.rand_name('endpoint_group')
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_endpoint_group,
+ name=name,
+ type='subnet',
+ endpoints=["10.2.0.0/24", "10.3.0.0/24"])
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('670844d2-f7b0-466c-8fd7-7d2ad6e832ee')
+ def test_fail_create_endpoint_group_when_provide_subnet_id_with_cidr(self):
+ # Creates a endpoint-group when provide subnet id with type cidr
+ name = data_utils.rand_name('endpoint_group')
+ subnet_id = self.subnet['id']
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_endpoint_group,
+ name=name,
+ type='cidr',
+ endpoints=subnet_id)
+
+ @decorators.attr(type=['negative', 'smoke'])
+ @decorators.idempotent_id('d7516513-f2a2-42bd-8cea-baba73b93a22')
+ def test_fail_create_endpoint_group_with_mixed_IP_version(self):
+ # Creates a endpoint-group with mixed IP version
+ name = data_utils.rand_name('endpoint_group')
+ self.assertRaises(
+ lib_exc.BadRequest,
+ self.client.create_endpoint_group,
+ name=name,
+ type='cidr',
+ endpoints=["10.2.0.0/24", "2000::1"])
diff --git a/neutron_tempest_plugin/vpnaas/scenario/__init__.py b/neutron_tempest_plugin/vpnaas/scenario/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/scenario/__init__.py
diff --git a/neutron_tempest_plugin/vpnaas/scenario/base_vpnaas.py b/neutron_tempest_plugin/vpnaas/scenario/base_vpnaas.py
new file mode 100644
index 0000000..c09b40e
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/scenario/base_vpnaas.py
@@ -0,0 +1,21 @@
+# Copyright (c) 2017 Midokura SARL
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron_tempest_plugin.scenario import base
+from neutron_tempest_plugin.vpnaas.api import base_vpnaas as base_api
+
+
+class BaseTempestTestCase(base_api.BaseNetworkTest, base.BaseTempestTestCase):
+ pass
diff --git a/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py b/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py
new file mode 100644
index 0000000..1e5c60a
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py
@@ -0,0 +1,297 @@
+# Copyright (c) 2017 Midokura SARL
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from oslo_config import cfg
+import testtools
+
+from tempest.common import utils
+from tempest.common import waiters
+from tempest.lib.common import ssh
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.scenario import constants
+from neutron_tempest_plugin.vpnaas.scenario import base_vpnaas as base
+
+
+CONF = config.CONF
+
+# NOTE(huntxu): This is a workaround due to a upstream bug [1].
+# VPNaaS 4in6 and 6in4 is not working properly with LibreSwan 3.19+.
+# In OpenStack zuul checks the base CentOS 7 node is using Libreswan 3.20 on
+# CentOS 7.4. So we need to provide a way to skip the 4in6 and 6in4 test cases
+# for zuul.
+#
+# Once the upstream bug gets fixed and the base node uses a newer version of
+# Libreswan with that fix, we can remove this.
+#
+# [1] https://github.com/libreswan/libreswan/issues/175
+CONF.register_opt(
+ cfg.BoolOpt('skip_4in6_6in4_tests',
+ default=False,
+ help='Whether to skip 4in6 and 6in4 test cases.'),
+ 'neutron_vpnaas_plugin_options'
+)
+
+
+class Vpnaas(base.BaseTempestTestCase):
+ """Test the following topology
+
+ +-------------------+
+ | public |
+ | network |
+ | |
+ +-+---------------+-+
+ | |
+ | |
+ +-------+-+ +-+-------+
+ | LEFT | | RIGHT |
+ | router | <--VPN--> | router |
+ | | | |
+ +----+----+ +----+----+
+ | |
+ +----+----+ +----+----+
+ | LEFT | | RIGHT |
+ | network | | network |
+ | | | |
+ +---------+ +---------+
+ """
+
+ credentials = ['primary', 'admin']
+ inner_ipv6 = False
+ outer_ipv6 = False
+
+ @classmethod
+ @utils.requires_ext(extension="vpnaas", service="network")
+ def resource_setup(cls):
+ super(Vpnaas, cls).resource_setup()
+
+ # common
+ cls.keypair = cls.create_keypair()
+ cls.secgroup = cls.os_primary.network_client.create_security_group(
+ name=data_utils.rand_name('secgroup-'))['security_group']
+ cls.security_groups.append(cls.secgroup)
+ cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
+ cls.create_pingable_secgroup_rule(secgroup_id=cls.secgroup['id'])
+ cls.ikepolicy = cls.create_ikepolicy(
+ data_utils.rand_name("ike-policy-"))
+ cls.ipsecpolicy = cls.create_ipsecpolicy(
+ data_utils.rand_name("ipsec-policy-"))
+
+ cls.extra_subnet_attributes = {}
+ if cls.inner_ipv6:
+ cls.create_v6_pingable_secgroup_rule(
+ secgroup_id=cls.secgroup['id'])
+ cls.extra_subnet_attributes['ipv6_address_mode'] = 'slaac'
+ cls.extra_subnet_attributes['ipv6_ra_mode'] = 'slaac'
+
+ # LEFT
+ cls.router = cls.create_router(
+ data_utils.rand_name('left-router'),
+ admin_state_up=True,
+ external_network_id=CONF.network.public_network_id)
+ cls.network = cls.create_network(network_name='left-network')
+ ip_version = 6 if cls.inner_ipv6 else 4
+ v4_cidr = netaddr.IPNetwork('10.20.0.0/24')
+ v6_cidr = netaddr.IPNetwork('2001:db8:0:2::/64')
+ cidr = v6_cidr if cls.inner_ipv6 else v4_cidr
+ cls.subnet = cls.create_subnet(
+ cls.network, ip_version=ip_version, cidr=cidr, name='left-subnet',
+ **cls.extra_subnet_attributes)
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+
+ # Gives an internal IPv4 subnet for floating IP to the left server,
+ # we use it to ssh into the left server.
+ if cls.inner_ipv6:
+ v4_subnet = cls.create_subnet(
+ cls.network, ip_version=4, name='left-v4-subnet')
+ cls.create_router_interface(cls.router['id'], v4_subnet['id'])
+
+ # RIGHT
+ cls._right_network, cls._right_subnet, cls._right_router = \
+ cls._create_right_network()
+
+ @classmethod
+ def create_v6_pingable_secgroup_rule(cls, secgroup_id=None, client=None):
+ # NOTE(huntxu): This method should be moved into the base class, along
+ # with the v4 version.
+ """This rule is intended to permit inbound ping6"""
+
+ rule_list = [{'protocol': 'ipv6-icmp',
+ 'direction': 'ingress',
+ 'port_range_min': 128, # type
+ 'port_range_max': 0, # code
+ 'ethertype': 'IPv6',
+ 'remote_ip_prefix': '::/0'}]
+ client = client or cls.os_primary.network_client
+ cls.create_secgroup_rules(rule_list, client=client,
+ secgroup_id=secgroup_id)
+
+ @classmethod
+ def _create_right_network(cls):
+ router = cls.create_router(
+ data_utils.rand_name('right-router'),
+ admin_state_up=True,
+ external_network_id=CONF.network.public_network_id)
+ network = cls.create_network(network_name='right-network')
+ v4_cidr = netaddr.IPNetwork('10.10.0.0/24')
+ v6_cidr = netaddr.IPNetwork('2001:db8:0:1::/64')
+ cidr = v6_cidr if cls.inner_ipv6 else v4_cidr
+ ip_version = 6 if cls.inner_ipv6 else 4
+ subnet = cls.create_subnet(
+ network, ip_version=ip_version, cidr=cidr, name='right-subnet',
+ **cls.extra_subnet_attributes)
+ cls.create_router_interface(router['id'], subnet['id'])
+
+ return network, subnet, router
+
+ def _create_server(self, create_floating_ip=True, network=None):
+ if network is None:
+ network = self.network
+ port = self.create_port(network, security_groups=[self.secgroup['id']])
+ if create_floating_ip:
+ fip = self.create_and_associate_floatingip(port['id'])
+ else:
+ fip = None
+ server = self.create_server(
+ flavor_ref=CONF.compute.flavor_ref,
+ image_ref=CONF.compute.image_ref,
+ key_name=self.keypair['name'],
+ networks=[{'port': port['id']}])['server']
+ waiters.wait_for_server_status(self.os_primary.servers_client,
+ server['id'],
+ constants.SERVER_STATUS_ACTIVE)
+ return {'port': port, 'fip': fip, 'server': server}
+
+ def _setup_vpn(self):
+ sites = [
+ dict(name="left", network=self.network, subnet=self.subnet,
+ router=self.router),
+ dict(name="right", network=self._right_network,
+ subnet=self._right_subnet, router=self._right_router),
+ ]
+ psk = data_utils.rand_name('mysecret')
+ for i in range(0, 2):
+ site = sites[i]
+ site['vpnservice'] = self.create_vpnservice(
+ site['subnet']['id'], site['router']['id'],
+ name=data_utils.rand_name('%s-vpnservice' % site['name']))
+ for i in range(0, 2):
+ site = sites[i]
+ vpnservice = site['vpnservice']
+ peer = sites[1 - i]
+ if self.outer_ipv6:
+ peer_address = peer['vpnservice']['external_v6_ip']
+ if not peer_address:
+ msg = "Public network must have an IPv6 subnet."
+ raise self.skipException(msg)
+ else:
+ peer_address = peer['vpnservice']['external_v4_ip']
+ self.create_ipsec_site_connection(
+ self.ikepolicy['id'],
+ self.ipsecpolicy['id'],
+ vpnservice['id'],
+ peer_address=peer_address,
+ peer_id=peer_address,
+ peer_cidrs=[peer['subnet']['cidr']],
+ psk=psk,
+ name=data_utils.rand_name(
+ '%s-ipsec-site-connection' % site['name']))
+
+ def _get_ip_on_subnet_for_port(self, port, subnet_id):
+ for fixed_ip in port['fixed_ips']:
+ if fixed_ip['subnet_id'] == subnet_id:
+ return fixed_ip['ip_address']
+ msg = "Cannot get IP address on specified subnet %s for port %r." % (
+ subnet_id, port)
+ raise self.fail(msg)
+
+ def _test_vpnaas(self):
+ # RIGHT
+ right_server = self._create_server(network=self._right_network,
+ create_floating_ip=False)
+ right_ip = self._get_ip_on_subnet_for_port(
+ right_server['port'], self._right_subnet['id'])
+
+ # LEFT
+ left_server = self._create_server()
+ ssh_client = ssh.Client(left_server['fip']['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'])
+
+ # check LEFT -> RIGHT connectivity via VPN
+ self.check_remote_connectivity(ssh_client, right_ip,
+ should_succeed=False)
+ self._setup_vpn()
+ self.check_remote_connectivity(ssh_client, right_ip)
+
+ # Test VPN traffic and floating IP traffic don't interfere each other.
+ if not self.inner_ipv6:
+ # Assign a floating-ip and check connectivity.
+ # This is NOT via VPN.
+ fip = self.create_and_associate_floatingip(
+ right_server['port']['id'])
+ self.check_remote_connectivity(ssh_client,
+ fip['floating_ip_address'])
+
+ # check LEFT -> RIGHT connectivity via VPN again, to ensure
+ # the above floating-ip doesn't interfere the traffic.
+ self.check_remote_connectivity(ssh_client, right_ip)
+
+
+class Vpnaas4in4(Vpnaas):
+
+ @decorators.idempotent_id('aa932ab2-63aa-49cf-a2a0-8ae71ac2bc24')
+ def test_vpnaas(self):
+ self._test_vpnaas()
+
+
+class Vpnaas4in6(Vpnaas):
+ outer_ipv6 = True
+
+ @decorators.idempotent_id('2d5f18dc-6186-4deb-842b-051325bd0466')
+ @testtools.skipUnless(CONF.network_feature_enabled.ipv6,
+ 'IPv6 tests are disabled.')
+ @testtools.skipIf(
+ CONF.neutron_vpnaas_plugin_options.skip_4in6_6in4_tests,
+ 'VPNaaS 4in6 test is skipped.')
+ def test_vpnaas_4in6(self):
+ self._test_vpnaas()
+
+
+class Vpnaas6in4(Vpnaas):
+ inner_ipv6 = True
+
+ @decorators.idempotent_id('10febf33-c5b7-48af-aa13-94b4fb585a55')
+ @testtools.skipUnless(CONF.network_feature_enabled.ipv6,
+ 'IPv6 tests are disabled.')
+ @testtools.skipIf(
+ CONF.neutron_vpnaas_plugin_options.skip_4in6_6in4_tests,
+ 'VPNaaS 6in4 test is skipped.')
+ def test_vpnaas_6in4(self):
+ self._test_vpnaas()
+
+
+class Vpnaas6in6(Vpnaas):
+ inner_ipv6 = True
+ outer_ipv6 = True
+
+ @decorators.idempotent_id('8b503ffc-aeb0-4938-8dba-73c7323e276d')
+ @testtools.skipUnless(CONF.network_feature_enabled.ipv6,
+ 'IPv6 tests are disabled.')
+ def test_vpnaas_6in6(self):
+ self._test_vpnaas()
diff --git a/neutron_tempest_plugin/vpnaas/services/__init__.py b/neutron_tempest_plugin/vpnaas/services/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/services/__init__.py
diff --git a/neutron_tempest_plugin/vpnaas/services/clients_vpnaas.py b/neutron_tempest_plugin/vpnaas/services/clients_vpnaas.py
new file mode 100644
index 0000000..06abd4f
--- /dev/null
+++ b/neutron_tempest_plugin/vpnaas/services/clients_vpnaas.py
@@ -0,0 +1,70 @@
+# Copyright 2012 OpenStack Foundation
+# Copyright 2016 Hewlett Packard Enterprise Development Company
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron_tempest_plugin.api import clients as manager
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.services.network.json import network_client
+
+
+CONF = config.CONF
+
+
+class NetworkClient(network_client.NetworkClientJSON):
+
+ def pluralize(self, resource_name):
+
+ resource_plural_map = {
+ 'ikepolicy': 'ikepolicies',
+ 'ipsecpolicy': 'ipsecpolicies'
+ }
+
+ if resource_name in resource_plural_map:
+ return resource_plural_map.get(resource_name)
+
+ return super(NetworkClient, self).pluralize(resource_name)
+
+ def get_uri(self, plural_name):
+ # get service prefix from resource name
+
+ service_resource_prefix_list = [
+ 'vpnservices',
+ 'ikepolicies',
+ 'ipsecpolicies',
+ 'ipsec_site_connections',
+ 'endpoint_groups',
+ ]
+
+ if plural_name in service_resource_prefix_list:
+ plural_name = plural_name.replace("_", "-")
+ service_prefix = 'vpn'
+ uri = '%s/%s/%s' % (self.uri_prefix, service_prefix,
+ plural_name)
+ return uri
+
+ return super(NetworkClient, self).get_uri(plural_name)
+
+
+class Manager(manager.Manager):
+ def __init__(self, credentials=None, service=None):
+ super(Manager, self).__init__(credentials=credentials)
+ self.network_client = NetworkClient(
+ self.auth_provider,
+ CONF.network.catalog_type,
+ CONF.network.region or CONF.identity.region,
+ endpoint_type=CONF.network.endpoint_type,
+ build_interval=CONF.network.build_interval,
+ build_timeout=CONF.network.build_timeout,
+ **self.default_params)