Merge "API tests for neutron router gateway"
diff --git a/tempest/api/network/base.py b/tempest/api/network/base.py
index 159c4f5..ed915c1 100644
--- a/tempest/api/network/base.py
+++ b/tempest/api/network/base.py
@@ -215,3 +215,19 @@
vpnservice = body['vpnservice']
cls.vpnservices.append(vpnservice)
return vpnservice
+
+
+class BaseAdminNetworkTest(BaseNetworkTest):
+
+ @classmethod
+ def setUpClass(cls):
+ super(BaseAdminNetworkTest, cls).setUpClass()
+ admin_username = cls.config.compute_admin.username
+ admin_password = cls.config.compute_admin.password
+ admin_tenant = cls.config.compute_admin.tenant_name
+ if not (admin_username and admin_password and admin_tenant):
+ msg = ("Missing Administrative Network API credentials "
+ "in configuration.")
+ raise cls.skipException(msg)
+ cls.admin_manager = clients.AdminManager(interface=cls._interface)
+ cls.admin_client = cls.admin_manager.network_client
diff --git a/tempest/api/network/test_routers.py b/tempest/api/network/test_routers.py
index 2cfbf61..512d065 100644
--- a/tempest/api/network/test_routers.py
+++ b/tempest/api/network/test_routers.py
@@ -20,7 +20,10 @@
from tempest.test import attr
-class RoutersTest(base.BaseNetworkTest):
+class RoutersTest(base.BaseAdminNetworkTest):
+ # NOTE(salv-orlando): This class inherits from BaseAdminNetworkTest
+ # as some router operations, such as enabling or disabling SNAT
+ # require admin credentials by default
_interface = 'json'
@classmethod
@@ -130,3 +133,99 @@
interface['port_id'])
self.assertEqual(show_port_body['port']['device_id'],
router['id'])
+
+ def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
+ resp, show_body = self.client.show_router(router_id)
+ self.assertEqual('200', resp['status'])
+ actual_ext_gw_info = show_body['router']['external_gateway_info']
+ if exp_ext_gw_info is None:
+ self.assertIsNone(actual_ext_gw_info)
+ return
+ # Verify only keys passed in exp_ext_gw_info
+ for k, v in exp_ext_gw_info.iteritems():
+ self.assertEqual(v, actual_ext_gw_info[k])
+
+ def _verify_gateway_port(self, router_id):
+ resp, list_body = self.admin_client.list_ports(
+ network_id=self.network_cfg.public_network_id,
+ device_id=router_id)
+ self.assertEqual(len(list_body['ports']), 1)
+ gw_port = list_body['ports'][0]
+ fixed_ips = gw_port['fixed_ips']
+ self.assertEqual(len(fixed_ips), 1)
+ resp, public_net_body = self.admin_client.show_network(
+ self.network_cfg.public_network_id)
+ public_subnet_id = public_net_body['network']['subnets'][0]
+ self.assertEqual(fixed_ips[0]['subnet_id'], public_subnet_id)
+
+ @attr(type='smoke')
+ def test_update_router_set_gateway(self):
+ router = self.create_router(rand_name('router-'))
+ self.client.update_router(
+ router['id'],
+ external_gateway_info={
+ 'network_id': self.network_cfg.public_network_id})
+ # Verify operation - router
+ resp, show_body = self.client.show_router(router['id'])
+ self.assertEqual('200', resp['status'])
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': self.network_cfg.public_network_id})
+ self._verify_gateway_port(router['id'])
+
+ @attr(type='smoke')
+ def test_update_router_set_gateway_with_snat_explicit(self):
+ router = self.create_router(rand_name('router-'))
+ self.admin_client.update_router_with_snat_gw_info(
+ router['id'],
+ external_gateway_info={
+ 'network_id': self.network_cfg.public_network_id,
+ 'enable_snat': True})
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': self.network_cfg.public_network_id,
+ 'enable_snat': True})
+ self._verify_gateway_port(router['id'])
+
+ @attr(type='smoke')
+ def test_update_router_set_gateway_without_snat(self):
+ router = self.create_router(rand_name('router-'))
+ self.admin_client.update_router_with_snat_gw_info(
+ router['id'],
+ external_gateway_info={
+ 'network_id': self.network_cfg.public_network_id,
+ 'enable_snat': False})
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': self.network_cfg.public_network_id,
+ 'enable_snat': False})
+ self._verify_gateway_port(router['id'])
+
+ @attr(type='smoke')
+ def test_update_router_unset_gateway(self):
+ router = self.create_router(
+ rand_name('router-'),
+ external_network_id=self.network_cfg.public_network_id)
+ self.client.update_router(router['id'], external_gateway_info={})
+ self._verify_router_gateway(router['id'])
+ # No gateway port expected
+ resp, list_body = self.admin_client.list_ports(
+ network_id=self.network_cfg.public_network_id,
+ device_id=router['id'])
+ self.assertFalse(list_body['ports'])
+
+ @attr(type='smoke')
+ def test_update_router_reset_gateway_without_snat(self):
+ router = self.create_router(
+ rand_name('router-'),
+ external_network_id=self.network_cfg.public_network_id)
+ self.admin_client.update_router_with_snat_gw_info(
+ router['id'],
+ external_gateway_info={
+ 'network_id': self.network_cfg.public_network_id,
+ 'enable_snat': False})
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': self.network_cfg.public_network_id,
+ 'enable_snat': False})
+ self._verify_gateway_port(router['id'])
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index 92c1faf..e7cd33f 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -123,8 +123,12 @@
resp, body = self.delete(uri, self.headers)
return resp, body
- def list_ports(self):
+ def list_ports(self, **filters):
uri = '%s/ports' % (self.uri_prefix)
+ filter_items = ["%s=%s" % (k, v) for (k, v) in filters.iteritems()]
+ querystring = "&".join(filter_items)
+ if querystring:
+ uri = "%s?%s" % (uri, querystring)
resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
@@ -223,7 +227,7 @@
body = json.loads(body)
return resp, body
- def update_router(self, router_id, **kwargs):
+ def _update_router(self, router_id, set_enable_snat, **kwargs):
uri = '%s/routers/%s' % (self.uri_prefix, router_id)
resp, body = self.get(uri, self.headers)
body = json.loads(body)
@@ -231,15 +235,34 @@
update_body['name'] = kwargs.get('name', body['router']['name'])
update_body['admin_state_up'] = kwargs.get(
'admin_state_up', body['router']['admin_state_up'])
- # Must uncomment/modify these lines once LP question#233187 is solved
- # update_body['external_gateway_info'] = kwargs.get(
- # 'external_gateway_info', body['router']['external_gateway_info'])
+ cur_gw_info = body['router']['external_gateway_info']
+ if cur_gw_info and not set_enable_snat:
+ cur_gw_info.pop('enable_snat', None)
+ update_body['external_gateway_info'] = kwargs.get(
+ 'external_gateway_info', body['router']['external_gateway_info'])
update_body = dict(router=update_body)
update_body = json.dumps(update_body)
resp, body = self.put(uri, update_body, self.headers)
body = json.loads(body)
return resp, body
+ def update_router(self, router_id, **kwargs):
+ """Update a router leaving enable_snat to its default value."""
+ # If external_gateway_info contains enable_snat the request will fail
+ # with 404 unless executed with admin client, and therefore we instruct
+ # _update_router to not set this attribute
+ # NOTE(salv-orlando): The above applies as long as Neutron's default
+ # policy is to restrict enable_snat usage to admins only.
+ return self._update_router(router_id, set_enable_snat=False, **kwargs)
+
+ def update_router_with_snat_gw_info(self, router_id, **kwargs):
+ """Update a router passing also the enable_snat attribute.
+
+ This method must be execute with admin credentials, otherwise the API
+ call will return a 404 error.
+ """
+ return self._update_router(router_id, set_enable_snat=True, **kwargs)
+
def add_router_interface_with_subnet_id(self, router_id, subnet_id):
uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix,
router_id)