|  | # Copyright 2012 OpenStack Foundation | 
|  | # Copyright 2013 IBM Corp. | 
|  | # All Rights Reserved. | 
|  | # | 
|  | #    Licensed under the Apache License, Version 2.0 (the "License"); you may | 
|  | #    not use this file except in compliance with the License. You may obtain | 
|  | #    a copy of the License at | 
|  | # | 
|  | #         http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | #    Unless required by applicable law or agreed to in writing, software | 
|  | #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
|  | #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
|  | #    License for the specific language governing permissions and limitations | 
|  | #    under the License. | 
|  |  | 
|  | from oslo_log import log | 
|  |  | 
|  | from tempest.common import utils | 
|  | from tempest import config | 
|  | from tempest.lib.common.utils import data_utils | 
|  | from tempest.lib.common.utils import test_utils | 
|  | from tempest.lib import exceptions as lib_exc | 
|  | from tempest.scenario import manager | 
|  |  | 
|  | CONF = config.CONF | 
|  |  | 
|  | LOG = log.getLogger(__name__) | 
|  |  | 
|  |  | 
|  | class ScenarioTest(manager.NetworkScenarioTest): | 
|  | """Base class for scenario tests. Uses tempest own clients. """ | 
|  |  | 
|  | credentials = ['primary'] | 
|  |  | 
|  |  | 
|  | class NetworkScenarioTest(ScenarioTest): | 
|  | """Base class for network scenario tests. | 
|  |  | 
|  | This class provide helpers for network scenario tests, using the neutron | 
|  | API. Helpers from ancestor which use the nova network API are overridden | 
|  | with the neutron API. | 
|  |  | 
|  | This Class also enforces using Neutron instead of novanetwork. | 
|  | Subclassed tests will be skipped if Neutron is not enabled | 
|  |  | 
|  | """ | 
|  |  | 
|  | credentials = ['primary', 'admin'] | 
|  |  | 
|  | @classmethod | 
|  | def skip_checks(cls): | 
|  | super(NetworkScenarioTest, cls).skip_checks() | 
|  | if not CONF.service_available.neutron: | 
|  | raise cls.skipException('Neutron not available') | 
|  | if not utils.is_extension_enabled('bgpvpn', 'network'): | 
|  | msg = "Bgpvpn extension not enabled." | 
|  | raise cls.skipException(msg) | 
|  |  | 
|  | def _check_remote_connectivity(self, source, dest, should_succeed=True, | 
|  | nic=None): | 
|  | """check ping server via source ssh connection | 
|  |  | 
|  | :param source: RemoteClient: an ssh connection from which to ping | 
|  | :param dest: and IP to ping against | 
|  | :param should_succeed: boolean should ping succeed or not | 
|  | :param nic: specific network interface to ping from | 
|  | :returns: boolean -- should_succeed == ping | 
|  | :returns: ping is false if ping failed | 
|  | """ | 
|  | def ping_remote(): | 
|  | try: | 
|  | source.ping_host(dest, nic=nic) | 
|  | except lib_exc.SSHExecCommandFailed: | 
|  | LOG.warning('Failed to ping IP: %s via a ssh connection ' | 
|  | 'from: %s.', dest, source.ssh_client.host) | 
|  | return not should_succeed | 
|  | return should_succeed | 
|  |  | 
|  | return test_utils.call_until_true(ping_remote, | 
|  | CONF.validation.ping_timeout, | 
|  | 1) | 
|  |  | 
|  | def create_loginable_secgroup_rule(self, security_group_rules_client=None, | 
|  | secgroup=None, | 
|  | security_groups_client=None): | 
|  | """Create loginable security group rule | 
|  |  | 
|  | This function will create: | 
|  | 1. egress and ingress tcp port 22 allow rule in order to allow ssh | 
|  | access for ipv4. | 
|  | 2. egress and ingress tcp port 80 allow rule in order to allow http | 
|  | access for ipv4. | 
|  | 3. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6. | 
|  | 4. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4. | 
|  | """ | 
|  |  | 
|  | if security_group_rules_client is None: | 
|  | security_group_rules_client = self.security_group_rules_client | 
|  | if security_groups_client is None: | 
|  | security_groups_client = self.security_groups_client | 
|  | rules = [] | 
|  | rulesets = [ | 
|  | dict( | 
|  | # ssh | 
|  | protocol='tcp', | 
|  | port_range_min=22, | 
|  | port_range_max=22, | 
|  | ), | 
|  | dict( | 
|  | # http | 
|  | protocol='tcp', | 
|  | port_range_min=80, | 
|  | port_range_max=80, | 
|  | ), | 
|  | dict( | 
|  | # ping | 
|  | protocol='icmp', | 
|  | ), | 
|  | dict( | 
|  | # ipv6-icmp for ping6 | 
|  | protocol='icmp', | 
|  | ethertype='IPv6', | 
|  | ) | 
|  | ] | 
|  | sec_group_rules_client = security_group_rules_client | 
|  | for ruleset in rulesets: | 
|  | for r_direction in ['ingress', 'egress']: | 
|  | ruleset['direction'] = r_direction | 
|  | try: | 
|  | sg_rule = self.create_security_group_rule( | 
|  | sec_group_rules_client=sec_group_rules_client, | 
|  | secgroup=secgroup, | 
|  | security_groups_client=security_groups_client, | 
|  | **ruleset) | 
|  | except lib_exc.Conflict as ex: | 
|  | # if rule already exist - skip rule and continue | 
|  | msg = 'Security group rule already exists' | 
|  | if msg not in ex._error_string: | 
|  | raise ex | 
|  | else: | 
|  | self.assertEqual(r_direction, sg_rule['direction']) | 
|  | rules.append(sg_rule) | 
|  |  | 
|  | return rules | 
|  |  | 
|  | def _create_router(self, client=None, tenant_id=None, | 
|  | namestart='router-smoke'): | 
|  | if not client: | 
|  | client = self.admin_routers_client | 
|  | if not tenant_id: | 
|  | tenant_id = client.project_id | 
|  | name = data_utils.rand_name(namestart) | 
|  | result = client.create_router(name=name, | 
|  | admin_state_up=True, | 
|  | tenant_id=tenant_id) | 
|  | router = result['router'] | 
|  | self.assertEqual(router['name'], name) | 
|  | self.addCleanup(test_utils.call_and_ignore_notfound_exc, | 
|  | client.delete_router, | 
|  | router['id']) | 
|  | return router |