API tests for neutron router gateway

This patch adds tests for validating the Neutron API for managing
router gateways.

To run these tests successfully, the neutron client has been augmented
with a method to send the enable_snat attribute to the server; this
method should be invoked only when the client has admin credentials;

Also, this patch adds the capability of specifying filter when
querying the API for ports, as this is needed by gateway tests.

Change-Id: I4e92384559bb2093c0eddf78f5dbb526af1acc38
Closes-Bug: #1228091
diff --git a/tempest/api/network/base.py b/tempest/api/network/base.py
index b6c2679..8eefb1a 100644
--- a/tempest/api/network/base.py
+++ b/tempest/api/network/base.py
@@ -251,3 +251,19 @@
         vpnservice = body['vpnservice']
         cls.vpnservices.append(vpnservice)
         return vpnservice
+
+
+class BaseAdminNetworkTest(BaseNetworkTest):
+
+    @classmethod
+    def setUpClass(cls):
+        super(BaseAdminNetworkTest, cls).setUpClass()
+        admin_username = cls.config.compute_admin.username
+        admin_password = cls.config.compute_admin.password
+        admin_tenant = cls.config.compute_admin.tenant_name
+        if not (admin_username and admin_password and admin_tenant):
+            msg = ("Missing Administrative Network API credentials "
+                   "in configuration.")
+            raise cls.skipException(msg)
+        cls.admin_manager = clients.AdminManager(interface=cls._interface)
+        cls.admin_client = cls.admin_manager.network_client
diff --git a/tempest/api/network/test_routers.py b/tempest/api/network/test_routers.py
index 2cfbf61..512d065 100644
--- a/tempest/api/network/test_routers.py
+++ b/tempest/api/network/test_routers.py
@@ -20,7 +20,10 @@
 from tempest.test import attr
 
 
-class RoutersTest(base.BaseNetworkTest):
+class RoutersTest(base.BaseAdminNetworkTest):
+    # NOTE(salv-orlando): This class inherits from BaseAdminNetworkTest
+    # as some router operations, such as enabling or disabling SNAT
+    # require admin credentials by default
     _interface = 'json'
 
     @classmethod
@@ -130,3 +133,99 @@
             interface['port_id'])
         self.assertEqual(show_port_body['port']['device_id'],
                          router['id'])
+
+    def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
+        resp, show_body = self.client.show_router(router_id)
+        self.assertEqual('200', resp['status'])
+        actual_ext_gw_info = show_body['router']['external_gateway_info']
+        if exp_ext_gw_info is None:
+            self.assertIsNone(actual_ext_gw_info)
+            return
+        # Verify only keys passed in exp_ext_gw_info
+        for k, v in exp_ext_gw_info.iteritems():
+            self.assertEqual(v, actual_ext_gw_info[k])
+
+    def _verify_gateway_port(self, router_id):
+        resp, list_body = self.admin_client.list_ports(
+            network_id=self.network_cfg.public_network_id,
+            device_id=router_id)
+        self.assertEqual(len(list_body['ports']), 1)
+        gw_port = list_body['ports'][0]
+        fixed_ips = gw_port['fixed_ips']
+        self.assertEqual(len(fixed_ips), 1)
+        resp, public_net_body = self.admin_client.show_network(
+            self.network_cfg.public_network_id)
+        public_subnet_id = public_net_body['network']['subnets'][0]
+        self.assertEqual(fixed_ips[0]['subnet_id'], public_subnet_id)
+
+    @attr(type='smoke')
+    def test_update_router_set_gateway(self):
+        router = self.create_router(rand_name('router-'))
+        self.client.update_router(
+            router['id'],
+            external_gateway_info={
+                'network_id': self.network_cfg.public_network_id})
+        # Verify operation - router
+        resp, show_body = self.client.show_router(router['id'])
+        self.assertEqual('200', resp['status'])
+        self._verify_router_gateway(
+            router['id'],
+            {'network_id': self.network_cfg.public_network_id})
+        self._verify_gateway_port(router['id'])
+
+    @attr(type='smoke')
+    def test_update_router_set_gateway_with_snat_explicit(self):
+        router = self.create_router(rand_name('router-'))
+        self.admin_client.update_router_with_snat_gw_info(
+            router['id'],
+            external_gateway_info={
+                'network_id': self.network_cfg.public_network_id,
+                'enable_snat': True})
+        self._verify_router_gateway(
+            router['id'],
+            {'network_id': self.network_cfg.public_network_id,
+             'enable_snat': True})
+        self._verify_gateway_port(router['id'])
+
+    @attr(type='smoke')
+    def test_update_router_set_gateway_without_snat(self):
+        router = self.create_router(rand_name('router-'))
+        self.admin_client.update_router_with_snat_gw_info(
+            router['id'],
+            external_gateway_info={
+                'network_id': self.network_cfg.public_network_id,
+                'enable_snat': False})
+        self._verify_router_gateway(
+            router['id'],
+            {'network_id': self.network_cfg.public_network_id,
+             'enable_snat': False})
+        self._verify_gateway_port(router['id'])
+
+    @attr(type='smoke')
+    def test_update_router_unset_gateway(self):
+        router = self.create_router(
+            rand_name('router-'),
+            external_network_id=self.network_cfg.public_network_id)
+        self.client.update_router(router['id'], external_gateway_info={})
+        self._verify_router_gateway(router['id'])
+        # No gateway port expected
+        resp, list_body = self.admin_client.list_ports(
+            network_id=self.network_cfg.public_network_id,
+            device_id=router['id'])
+        self.assertFalse(list_body['ports'])
+
+    @attr(type='smoke')
+    def test_update_router_reset_gateway_without_snat(self):
+        router = self.create_router(
+            rand_name('router-'),
+            external_network_id=self.network_cfg.public_network_id)
+        self.admin_client.update_router_with_snat_gw_info(
+            router['id'],
+            external_gateway_info={
+                'network_id': self.network_cfg.public_network_id,
+                'enable_snat': False})
+        self._verify_router_gateway(
+            router['id'],
+            {'network_id': self.network_cfg.public_network_id,
+             'enable_snat': False})
+        self._verify_gateway_port(router['id'])
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index 92c1faf..e7cd33f 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -123,8 +123,12 @@
         resp, body = self.delete(uri, self.headers)
         return resp, body
 
-    def list_ports(self):
+    def list_ports(self, **filters):
         uri = '%s/ports' % (self.uri_prefix)
+        filter_items = ["%s=%s" % (k, v) for (k, v) in filters.iteritems()]
+        querystring = "&".join(filter_items)
+        if querystring:
+            uri = "%s?%s" % (uri, querystring)
         resp, body = self.get(uri, self.headers)
         body = json.loads(body)
         return resp, body
@@ -223,7 +227,7 @@
         body = json.loads(body)
         return resp, body
 
-    def update_router(self, router_id, **kwargs):
+    def _update_router(self, router_id, set_enable_snat, **kwargs):
         uri = '%s/routers/%s' % (self.uri_prefix, router_id)
         resp, body = self.get(uri, self.headers)
         body = json.loads(body)
@@ -231,15 +235,34 @@
         update_body['name'] = kwargs.get('name', body['router']['name'])
         update_body['admin_state_up'] = kwargs.get(
             'admin_state_up', body['router']['admin_state_up'])
-        # Must uncomment/modify these lines once LP question#233187 is solved
-        # update_body['external_gateway_info'] = kwargs.get(
-        # 'external_gateway_info', body['router']['external_gateway_info'])
+        cur_gw_info = body['router']['external_gateway_info']
+        if cur_gw_info and not set_enable_snat:
+            cur_gw_info.pop('enable_snat', None)
+        update_body['external_gateway_info'] = kwargs.get(
+            'external_gateway_info', body['router']['external_gateway_info'])
         update_body = dict(router=update_body)
         update_body = json.dumps(update_body)
         resp, body = self.put(uri, update_body, self.headers)
         body = json.loads(body)
         return resp, body
 
+    def update_router(self, router_id, **kwargs):
+        """Update a router leaving enable_snat to its default value."""
+        # If external_gateway_info contains enable_snat the request will fail
+        # with 404 unless executed with admin client, and therefore we instruct
+        # _update_router to not set this attribute
+        # NOTE(salv-orlando): The above applies as long as Neutron's default
+        # policy is to restrict enable_snat usage to admins only.
+        return self._update_router(router_id, set_enable_snat=False, **kwargs)
+
+    def update_router_with_snat_gw_info(self, router_id, **kwargs):
+        """Update a router passing also the enable_snat attribute.
+
+        This method must be execute with admin credentials, otherwise the API
+        call will return a 404 error.
+        """
+        return self._update_router(router_id, set_enable_snat=True, **kwargs)
+
     def add_router_interface_with_subnet_id(self, router_id, subnet_id):
         uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix,
               router_id)