| # Copyright 2012 OpenStack Foundation |
| # Copyright 2013 IBM Corp. |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| from oslo_log import log |
| |
| from tempest.common import utils |
| from tempest import config |
| from tempest.lib.common.utils import data_utils |
| from tempest.lib.common.utils import test_utils |
| from tempest.lib import exceptions as lib_exc |
| from tempest.scenario import manager |
| |
| CONF = config.CONF |
| |
| LOG = log.getLogger(__name__) |
| |
| |
| class ScenarioTest(manager.NetworkScenarioTest): |
| """Base class for scenario tests. Uses tempest own clients. """ |
| |
| credentials = ['primary'] |
| |
| |
| class NetworkScenarioTest(ScenarioTest): |
| """Base class for network scenario tests. |
| |
| This class provide helpers for network scenario tests, using the neutron |
| API. Helpers from ancestor which use the nova network API are overridden |
| with the neutron API. |
| |
| This Class also enforces using Neutron instead of novanetwork. |
| Subclassed tests will be skipped if Neutron is not enabled |
| |
| """ |
| |
| credentials = ['primary', 'admin'] |
| |
| @classmethod |
| def skip_checks(cls): |
| super(NetworkScenarioTest, cls).skip_checks() |
| if not CONF.service_available.neutron: |
| raise cls.skipException('Neutron not available') |
| if not utils.is_extension_enabled('bgpvpn', 'network'): |
| msg = "Bgpvpn extension not enabled." |
| raise cls.skipException(msg) |
| |
| def _check_remote_connectivity(self, source, dest, should_succeed=True, |
| nic=None): |
| """check ping server via source ssh connection |
| |
| :param source: RemoteClient: an ssh connection from which to ping |
| :param dest: and IP to ping against |
| :param should_succeed: boolean should ping succeed or not |
| :param nic: specific network interface to ping from |
| :returns: boolean -- should_succeed == ping |
| :returns: ping is false if ping failed |
| """ |
| def ping_remote(): |
| try: |
| source.ping_host(dest, nic=nic) |
| except lib_exc.SSHExecCommandFailed: |
| LOG.warning('Failed to ping IP: %s via a ssh connection ' |
| 'from: %s.', dest, source.ssh_client.host) |
| return not should_succeed |
| return should_succeed |
| |
| return test_utils.call_until_true(ping_remote, |
| CONF.validation.ping_timeout, |
| 1) |
| |
| def create_loginable_secgroup_rule(self, security_group_rules_client=None, |
| secgroup=None, |
| security_groups_client=None): |
| """Create loginable security group rule |
| |
| This function will create: |
| 1. egress and ingress tcp port 22 allow rule in order to allow ssh |
| access for ipv4. |
| 2. egress and ingress tcp port 80 allow rule in order to allow http |
| access for ipv4. |
| 3. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6. |
| 4. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4. |
| """ |
| |
| if security_group_rules_client is None: |
| security_group_rules_client = self.security_group_rules_client |
| if security_groups_client is None: |
| security_groups_client = self.security_groups_client |
| rules = [] |
| rulesets = [ |
| dict( |
| # ssh |
| protocol='tcp', |
| port_range_min=22, |
| port_range_max=22, |
| ), |
| dict( |
| # http |
| protocol='tcp', |
| port_range_min=80, |
| port_range_max=80, |
| ), |
| dict( |
| # ping |
| protocol='icmp', |
| ), |
| dict( |
| # ipv6-icmp for ping6 |
| protocol='icmp', |
| ethertype='IPv6', |
| ) |
| ] |
| sec_group_rules_client = security_group_rules_client |
| for ruleset in rulesets: |
| for r_direction in ['ingress', 'egress']: |
| ruleset['direction'] = r_direction |
| try: |
| sg_rule = self.create_security_group_rule( |
| sec_group_rules_client=sec_group_rules_client, |
| secgroup=secgroup, |
| security_groups_client=security_groups_client, |
| **ruleset) |
| except lib_exc.Conflict as ex: |
| # if rule already exist - skip rule and continue |
| msg = 'Security group rule already exists' |
| if msg not in ex._error_string: |
| raise ex |
| else: |
| self.assertEqual(r_direction, sg_rule['direction']) |
| rules.append(sg_rule) |
| |
| return rules |
| |
| def _create_router(self, client=None, tenant_id=None, |
| namestart='router-smoke'): |
| if not client: |
| client = self.admin_routers_client |
| if not tenant_id: |
| tenant_id = client.tenant_id |
| name = data_utils.rand_name(namestart) |
| result = client.create_router(name=name, |
| admin_state_up=True, |
| tenant_id=tenant_id) |
| router = result['router'] |
| self.assertEqual(router['name'], name) |
| self.addCleanup(test_utils.call_and_ignore_notfound_exc, |
| client.delete_router, |
| router['id']) |
| return router |