Merge "Add tests for binding extended attributes for ports"
diff --git a/tempest/api/network/test_networks.py b/tempest/api/network/test_networks.py
index 88e7238..70fb00a 100644
--- a/tempest/api/network/test_networks.py
+++ b/tempest/api/network/test_networks.py
@@ -67,7 +67,6 @@
         cls.name = cls.network['name']
         cls.subnet = cls.create_subnet(cls.network)
         cls.cidr = cls.subnet['cidr']
-        cls.port = cls.create_port(cls.network)
 
     @attr(type='smoke')
     def test_create_update_delete_network_subnet(self):
@@ -184,90 +183,6 @@
             self.assertEqual(len(subnet), 1)
             self.assertIn('id', subnet)
 
-    @attr(type='smoke')
-    def test_create_update_delete_port(self):
-        # Verify port creation
-        resp, body = self.client.create_port(network_id=self.network['id'])
-        self.assertEqual('201', resp['status'])
-        port = body['port']
-        self.assertTrue(port['admin_state_up'])
-        # Verify port update
-        new_name = "New_Port"
-        resp, body = self.client.update_port(
-            port['id'],
-            name=new_name,
-            admin_state_up=False)
-        self.assertEqual('200', resp['status'])
-        updated_port = body['port']
-        self.assertEqual(updated_port['name'], new_name)
-        self.assertFalse(updated_port['admin_state_up'])
-        # Verify port deletion
-        resp, body = self.client.delete_port(port['id'])
-        self.assertEqual('204', resp['status'])
-
-    @attr(type='smoke')
-    def test_show_port(self):
-        # Verify the details of port
-        resp, body = self.client.show_port(self.port['id'])
-        self.assertEqual('200', resp['status'])
-        port = body['port']
-        self.assertIn('id', port)
-        self.assertEqual(port['id'], self.port['id'])
-
-    @attr(type='smoke')
-    def test_show_port_fields(self):
-        # Verify specific fields of a port
-        field_list = [('fields', 'id'), ]
-        resp, body = self.client.show_port(self.port['id'],
-                                           field_list=field_list)
-        self.assertEqual('200', resp['status'])
-        port = body['port']
-        self.assertEqual(len(port), len(field_list))
-        for label, field_name in field_list:
-            self.assertEqual(port[field_name], self.port[field_name])
-
-    @attr(type='smoke')
-    def test_list_ports(self):
-        # Verify the port exists in the list of all ports
-        resp, body = self.client.list_ports()
-        self.assertEqual('200', resp['status'])
-        ports = [port['id'] for port in body['ports']
-                 if port['id'] == self.port['id']]
-        self.assertNotEmpty(ports, "Created port not found in the list")
-
-    @attr(type='smoke')
-    def test_port_list_filter_by_router_id(self):
-        # Create a router
-        network = self.create_network()
-        self.create_subnet(network)
-        router = self.create_router(data_utils.rand_name('router-'))
-        resp, port = self.client.create_port(network_id=network['id'])
-        # Add router interface to port created above
-        resp, interface = self.client.add_router_interface_with_port_id(
-            router['id'], port['port']['id'])
-        self.addCleanup(self.client.remove_router_interface_with_port_id,
-                        router['id'], port['port']['id'])
-        # List ports filtered by router_id
-        resp, port_list = self.client.list_ports(
-            device_id=router['id'])
-        self.assertEqual('200', resp['status'])
-        ports = port_list['ports']
-        self.assertEqual(len(ports), 1)
-        self.assertEqual(ports[0]['id'], port['port']['id'])
-        self.assertEqual(ports[0]['device_id'], router['id'])
-
-    @attr(type='smoke')
-    def test_list_ports_fields(self):
-        # Verify specific fields of ports
-        resp, body = self.client.list_ports(fields='id')
-        self.assertEqual('200', resp['status'])
-        ports = body['ports']
-        self.assertNotEmpty(ports, "Port list returned is empty")
-        # Asserting the fields returned are correct
-        for port in ports:
-            self.assertEqual(len(port), 1)
-            self.assertIn('id', port)
-
 
 class NetworksTestXML(NetworksTestJSON):
     _interface = 'xml'
diff --git a/tempest/api/network/test_ports.py b/tempest/api/network/test_ports.py
new file mode 100644
index 0000000..fbb25a8
--- /dev/null
+++ b/tempest/api/network/test_ports.py
@@ -0,0 +1,249 @@
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import socket
+
+from tempest.api.network import base
+from tempest.common.utils import data_utils
+from tempest import config
+from tempest import test
+
+CONF = config.CONF
+
+
+class PortsTestJSON(base.BaseNetworkTest):
+    _interface = 'json'
+
+    @classmethod
+    def setUpClass(cls):
+        super(PortsTestJSON, cls).setUpClass()
+        cls.network = cls.create_network()
+        cls.port = cls.create_port(cls.network)
+
+    def _delete_port(self, port_id):
+        resp, body = self.client.delete_port(port_id)
+        self.assertEqual('204', resp['status'])
+        resp, body = self.client.list_ports()
+        self.assertEqual('200', resp['status'])
+        ports_list = body['ports']
+        self.assertFalse(port_id in [n['id'] for n in ports_list])
+
+    @test.attr(type='smoke')
+    def test_create_update_delete_port(self):
+        # Verify port creation
+        resp, body = self.client.create_port(network_id=self.network['id'])
+        self.assertEqual('201', resp['status'])
+        port = body['port']
+        self.assertTrue(port['admin_state_up'])
+        # Verify port update
+        new_name = "New_Port"
+        resp, body = self.client.update_port(
+            port['id'],
+            name=new_name,
+            admin_state_up=False)
+        self.assertEqual('200', resp['status'])
+        updated_port = body['port']
+        self.assertEqual(updated_port['name'], new_name)
+        self.assertFalse(updated_port['admin_state_up'])
+        # Verify port deletion
+        resp, body = self.client.delete_port(port['id'])
+        self.assertEqual('204', resp['status'])
+
+    @test.attr(type='smoke')
+    def test_show_port(self):
+        # Verify the details of port
+        resp, body = self.client.show_port(self.port['id'])
+        self.assertEqual('200', resp['status'])
+        port = body['port']
+        self.assertIn('id', port)
+        self.assertEqual(port['id'], self.port['id'])
+        self.assertEqual(self.port['admin_state_up'], port['admin_state_up'])
+        self.assertEqual(self.port['device_id'], port['device_id'])
+        self.assertEqual(self.port['device_owner'], port['device_owner'])
+        self.assertEqual(self.port['mac_address'], port['mac_address'])
+        self.assertEqual(self.port['name'], port['name'])
+        self.assertEqual(self.port['security_groups'],
+                         port['security_groups'])
+        self.assertEqual(self.port['network_id'], port['network_id'])
+        self.assertEqual(self.port['security_groups'],
+                         port['security_groups'])
+
+    @test.attr(type='smoke')
+    def test_show_port_fields(self):
+        # Verify specific fields of a port
+        field_list = [('fields', 'id'), ]
+        resp, body = self.client.show_port(self.port['id'],
+                                           field_list=field_list)
+        self.assertEqual('200', resp['status'])
+        port = body['port']
+        self.assertEqual(len(port), len(field_list))
+        for label, field_name in field_list:
+            self.assertEqual(port[field_name], self.port[field_name])
+
+    @test.attr(type='smoke')
+    def test_list_ports(self):
+        # Verify the port exists in the list of all ports
+        resp, body = self.client.list_ports()
+        self.assertEqual('200', resp['status'])
+        ports = [port['id'] for port in body['ports']
+                 if port['id'] == self.port['id']]
+        self.assertNotEmpty(ports, "Created port not found in the list")
+
+    @test.attr(type='smoke')
+    def test_port_list_filter_by_router_id(self):
+        # Create a router
+        network = self.create_network()
+        self.create_subnet(network)
+        router = self.create_router(data_utils.rand_name('router-'))
+        resp, port = self.client.create_port(network_id=network['id'])
+        # Add router interface to port created above
+        resp, interface = self.client.add_router_interface_with_port_id(
+            router['id'], port['port']['id'])
+        self.addCleanup(self.client.remove_router_interface_with_port_id,
+                        router['id'], port['port']['id'])
+        # List ports filtered by router_id
+        resp, port_list = self.client.list_ports(
+            device_id=router['id'])
+        self.assertEqual('200', resp['status'])
+        ports = port_list['ports']
+        self.assertEqual(len(ports), 1)
+        self.assertEqual(ports[0]['id'], port['port']['id'])
+        self.assertEqual(ports[0]['device_id'], router['id'])
+
+    @test.attr(type='smoke')
+    def test_list_ports_fields(self):
+        # Verify specific fields of ports
+        resp, body = self.client.list_ports(fields='id')
+        self.assertEqual('200', resp['status'])
+        ports = body['ports']
+        self.assertNotEmpty(ports, "Port list returned is empty")
+        # Asserting the fields returned are correct
+        for port in ports:
+            self.assertEqual(len(port), 1)
+            self.assertIn('id', port)
+
+
+class PortsTestXML(PortsTestJSON):
+    _interface = 'xml'
+
+
+class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest):
+    _interface = 'json'
+
+    @classmethod
+    def setUpClass(cls):
+        super(PortsAdminExtendedAttrsTestJSON, cls).setUpClass()
+        cls.identity_client = cls._get_identity_admin_client()
+        cls.tenant = cls.identity_client.get_tenant_by_name(
+            CONF.identity.tenant_name)
+        cls.network = cls.create_network()
+        cls.host_id = socket.gethostname()
+
+    @test.attr(type='smoke')
+    def test_create_port_binding_ext_attr(self):
+        post_body = {"network_id": self.network['id'],
+                     "binding:host_id": self.host_id}
+        resp, body = self.admin_client.create_port(**post_body)
+        self.assertEqual('201', resp['status'])
+        port = body['port']
+        self.addCleanup(self.admin_client.delete_port, port['id'])
+        host_id = port['binding:host_id']
+        self.assertIsNotNone(host_id)
+        self.assertEqual(self.host_id, host_id)
+
+    @test.attr(type='smoke')
+    def test_update_port_binding_ext_attr(self):
+        post_body = {"network_id": self.network['id']}
+        resp, body = self.admin_client.create_port(**post_body)
+        self.assertEqual('201', resp['status'])
+        port = body['port']
+        self.addCleanup(self.admin_client.delete_port, port['id'])
+        update_body = {"binding:host_id": self.host_id}
+        resp, body = self.admin_client.update_port(port['id'], **update_body)
+        self.assertEqual('200', resp['status'])
+        updated_port = body['port']
+        host_id = updated_port['binding:host_id']
+        self.assertIsNotNone(host_id)
+        self.assertEqual(self.host_id, host_id)
+
+    @test.attr(type='smoke')
+    def test_list_ports_binding_ext_attr(self):
+        resp, body = self.admin_client.list_ports(
+            **{'tenant_id': self.tenant['id']})
+        self.assertEqual('200', resp['status'])
+        ports_list = body['ports']
+        for port in ports_list:
+            vif_type = port['binding:vif_type']
+            self.assertIsNotNone(vif_type)
+            vif_details = port['binding:vif_details']['port_filter']
+            self.assertIsNotNone(vif_details)
+
+    @test.attr(type='smoke')
+    def test_show_port_binding_ext_attr(self):
+        resp, body = self.admin_client.create_port(
+            network_id=self.network['id'])
+        self.assertEqual('201', resp['status'])
+        port = body['port']
+        self.addCleanup(self.admin_client.delete_port, port['id'])
+        resp, body = self.admin_client.show_port(port['id'])
+        self.assertEqual('200', resp['status'])
+        show_port = body['port']
+        self.assertEqual(port['binding:host_id'],
+                         show_port['binding:host_id'])
+        self.assertEqual(port['binding:vif_type'],
+                         show_port['binding:vif_type'])
+        self.assertEqual(port['binding:vif_details'],
+                         show_port['binding:vif_details'])
+
+
+class PortsAdminExtendedAttrsTestXML(PortsAdminExtendedAttrsTestJSON):
+    _interface = 'xml'
+
+
+class PortsIpV6TestJSON(PortsTestJSON):
+    _ip_version = 6
+    _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
+    _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
+
+    @classmethod
+    def setUpClass(cls):
+        super(PortsIpV6TestJSON, cls).setUpClass()
+        if not CONF.network_feature_enabled.ipv6:
+            cls.tearDownClass()
+            skip_msg = "IPv6 Tests are disabled."
+            raise cls.skipException(skip_msg)
+
+
+class PortsIpV6TestXML(PortsIpV6TestJSON):
+    _interface = 'xml'
+
+
+class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON):
+    _ip_version = 6
+    _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
+    _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
+
+    @classmethod
+    def setUpClass(cls):
+        super(PortsAdminExtendedAttrsIpV6TestJSON, cls).setUpClass()
+        if not CONF.network_feature_enabled.ipv6:
+            cls.tearDownClass()
+            skip_msg = "IPv6 Tests are disabled."
+            raise cls.skipException(skip_msg)
+
+
+class PortsAdminExtendedAttrsIpV6TestXML(
+    PortsAdminExtendedAttrsIpV6TestJSON):
+    _interface = 'xml'
diff --git a/tempest/services/compute/xml/common.py b/tempest/services/compute/xml/common.py
index b29b932..b1bf789 100644
--- a/tempest/services/compute/xml/common.py
+++ b/tempest/services/compute/xml/common.py
@@ -19,6 +19,7 @@
 XMLNS_V3 = "http://docs.openstack.org/compute/api/v1.1"
 
 NEUTRON_NAMESPACES = {
+    'binding': "http://docs.openstack.org/ext/binding/api/v1.0",
     'router': "http://docs.openstack.org/ext/neutron/router/api/v1.0",
     'provider': 'http://docs.openstack.org/ext/provider/api/v1.0',
 }