Merge "Prepare for enabling H302 rule (scenario)"
diff --git a/tempest/api/compute/floating_ips/test_floating_ips_actions.py b/tempest/api/compute/floating_ips/test_floating_ips_actions.py
index 56bd291..ea785b3 100644
--- a/tempest/api/compute/floating_ips/test_floating_ips_actions.py
+++ b/tempest/api/compute/floating_ips/test_floating_ips_actions.py
@@ -14,9 +14,9 @@
# under the License.
from tempest.api.compute.floating_ips import base
-from tempest.common.utils.data_utils import rand_name
+from tempest.common.utils import data_utils
from tempest import exceptions
-from tempest.test import attr
+from tempest import test
class FloatingIPsTestJSON(base.BaseFloatingIPsTest):
@@ -44,7 +44,7 @@
resp, body = cls.client.delete_floating_ip(cls.floating_ip_id)
super(FloatingIPsTestJSON, cls).tearDownClass()
- @attr(type='gate')
+ @test.attr(type='gate')
def test_allocate_floating_ip(self):
# Positive test:Allocation of a new floating IP to a project
# should be successful
@@ -59,7 +59,7 @@
resp, body = self.client.list_floating_ips()
self.assertIn(floating_ip_details, body)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_delete_floating_ip(self):
# Positive test:Deletion of valid floating IP from project
# should be successful
@@ -74,7 +74,7 @@
# Check it was really deleted.
self.client.wait_for_resource_deletion(floating_ip_body['id'])
- @attr(type='gate')
+ @test.attr(type='gate')
def test_associate_disassociate_floating_ip(self):
# Positive test:Associate and disassociate the provided floating IP
# to a specific server should be successful
@@ -90,12 +90,12 @@
self.server_id)
self.assertEqual(202, resp.status)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_associate_already_associated_floating_ip(self):
# positive test:Association of an already associated floating IP
# to specific server should change the association of the Floating IP
# Create server so as to use for Multiple association
- new_name = rand_name('floating_server')
+ new_name = data_utils.rand_name('floating_server')
resp, body = self.create_test_server(name=new_name)
self.servers_client.wait_for_server_status(body['id'], 'ACTIVE')
self.new_server_id = body['id']
diff --git a/tempest/api/compute/servers/test_create_server.py b/tempest/api/compute/servers/test_create_server.py
index 887608f..f705308 100644
--- a/tempest/api/compute/servers/test_create_server.py
+++ b/tempest/api/compute/servers/test_create_server.py
@@ -20,9 +20,9 @@
from tempest.api.compute import base
from tempest.common.utils import data_utils
-from tempest.common.utils.linux.remote_client import RemoteClient
+from tempest.common.utils.linux import remote_client
from tempest import config
-from tempest.test import attr
+from tempest import test
CONF = config.CONF
@@ -54,14 +54,14 @@
cls.client.wait_for_server_status(cls.server_initial['id'], 'ACTIVE')
resp, cls.server = cls.client.get_server(cls.server_initial['id'])
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_create_server_response(self):
# Check that the required fields are returned with values
self.assertEqual(202, self.resp.status)
self.assertTrue(self.server_initial['id'] is not None)
self.assertTrue(self.server_initial['adminPass'] is not None)
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_verify_server_details(self):
# Verify the specified server attributes are set correctly
self.assertEqual(self.accessIPv4, self.server['accessIPv4'])
@@ -74,7 +74,7 @@
self.assertEqual(self.flavor_ref, self.server['flavor']['id'])
self.assertEqual(self.meta, self.server['metadata'])
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_list_servers(self):
# The created server should be in the list of all servers
resp, body = self.client.list_servers()
@@ -82,7 +82,7 @@
found = any([i for i in servers if i['id'] == self.server['id']])
self.assertTrue(found)
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_list_servers_with_detail(self):
# The created server should be in the detailed list of all servers
resp, body = self.client.list_servers_with_detail()
@@ -91,19 +91,21 @@
self.assertTrue(found)
@testtools.skipIf(not run_ssh, 'Instance validation tests are disabled.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_verify_created_server_vcpus(self):
# Verify that the number of vcpus reported by the instance matches
# the amount stated by the flavor
resp, flavor = self.flavors_client.get_flavor_details(self.flavor_ref)
- linux_client = RemoteClient(self.server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(self.server, self.ssh_user,
+ self.password)
self.assertEqual(flavor['vcpus'], linux_client.get_number_of_vcpus())
@testtools.skipIf(not run_ssh, 'Instance validation tests are disabled.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_host_name_is_same_as_server_name(self):
# Verify the instance host name is the same as the server name
- linux_client = RemoteClient(self.server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(self.server, self.ssh_user,
+ self.password)
self.assertTrue(linux_client.hostname_equals_servername(self.name))
@@ -136,7 +138,7 @@
resp, cls.server = cls.client.get_server(cls.server_initial['id'])
@testtools.skipIf(not run_ssh, 'Instance validation tests are disabled.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_verify_created_server_ephemeral_disk(self):
# Verify that the ephemeral disk is created when creating server
@@ -196,12 +198,12 @@
adminPass=admin_pass,
flavor=flavor_with_eph_disk_id))
# Get partition number of server without extra specs.
- linux_client = RemoteClient(server_no_eph_disk,
- self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server_no_eph_disk,
+ self.ssh_user, self.password)
partition_num = len(linux_client.get_partitions())
- linux_client = RemoteClient(server_with_eph_disk,
- self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server_with_eph_disk,
+ self.ssh_user, self.password)
self.assertEqual(partition_num + 1, linux_client.get_partitions())
diff --git a/tempest/api/compute/servers/test_list_server_filters.py b/tempest/api/compute/servers/test_list_server_filters.py
index 15b7b9e..fc0bb9f 100644
--- a/tempest/api/compute/servers/test_list_server_filters.py
+++ b/tempest/api/compute/servers/test_list_server_filters.py
@@ -18,8 +18,7 @@
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
-from tempest.test import attr
-from tempest.test import skip_because
+from tempest import test
CONF = config.CONF
@@ -74,7 +73,7 @@
cls.fixed_network_name = CONF.compute.fixed_network_name
@utils.skip_unless_attr('multiple_images', 'Only one image found')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_image(self):
# Filter the list of servers by image
params = {'image': self.image_ref}
@@ -85,7 +84,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_flavor(self):
# Filter the list of servers by flavor
params = {'flavor': self.flavor_ref_alt}
@@ -96,7 +95,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_server_name(self):
# Filter the list of servers by server name
params = {'name': self.s1_name}
@@ -107,7 +106,7 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_server_status(self):
# Filter the list of servers by server status
params = {'status': 'active'}
@@ -118,7 +117,7 @@
self.assertIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_shutoff_status(self):
# Filter the list of servers by server shutoff status
params = {'status': 'shutoff'}
@@ -135,7 +134,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertNotIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_limit(self):
# Verify only the expected number of servers are returned
params = {'limit': 1}
@@ -143,14 +142,14 @@
# when _interface='xml', one element for servers_links in servers
self.assertEqual(1, len([x for x in servers['servers'] if 'id' in x]))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_zero_limit(self):
# Verify only the expected number of servers are returned
params = {'limit': 0}
resp, servers = self.client.list_servers(params)
self.assertEqual(0, len(servers['servers']))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_exceed_limit(self):
# Verify only the expected number of servers are returned
params = {'limit': 100000}
@@ -160,7 +159,7 @@
len([x for x in servers['servers'] if 'id' in x]))
@utils.skip_unless_attr('multiple_images', 'Only one image found')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_image(self):
# Filter the detailed list of servers by image
params = {'image': self.image_ref}
@@ -171,7 +170,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_flavor(self):
# Filter the detailed list of servers by flavor
params = {'flavor': self.flavor_ref_alt}
@@ -182,7 +181,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_server_name(self):
# Filter the detailed list of servers by server name
params = {'name': self.s1_name}
@@ -193,7 +192,7 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_server_status(self):
# Filter the detailed list of servers by server status
params = {'status': 'active'}
@@ -205,7 +204,7 @@
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
self.assertEqual(['ACTIVE'] * 3, [x['status'] for x in servers])
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filtered_by_name_wildcard(self):
# List all servers that contains '-instance' in name
params = {'name': '-instance'}
@@ -227,8 +226,8 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @skip_because(bug="1170718")
- @attr(type='gate')
+ @test.skip_because(bug="1170718")
+ @test.attr(type='gate')
def test_list_servers_filtered_by_ip(self):
# Filter servers by ip
# Here should be listed 1 server
@@ -242,9 +241,9 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @skip_because(bug="1182883",
- condition=CONF.service_available.neutron)
- @attr(type='gate')
+ @test.skip_because(bug="1182883",
+ condition=CONF.service_available.neutron)
+ @test.attr(type='gate')
def test_list_servers_filtered_by_ip_regex(self):
# Filter servers by regex ip
# List all servers filtered by part of ip address.
@@ -259,7 +258,7 @@
self.assertIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertIn(self.s3_name, map(lambda x: x['name'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_limit_results(self):
# Verify only the expected number of detailed results are returned
params = {'limit': 1}
diff --git a/tempest/api/compute/servers/test_server_actions.py b/tempest/api/compute/servers/test_server_actions.py
index f113047..adf522b 100644
--- a/tempest/api/compute/servers/test_server_actions.py
+++ b/tempest/api/compute/servers/test_server_actions.py
@@ -20,11 +20,10 @@
from tempest.api.compute import base
from tempest.common.utils import data_utils
-from tempest.common.utils.linux.remote_client import RemoteClient
+from tempest.common.utils.linux import remote_client
from tempest import config
from tempest import exceptions
-from tempest.test import attr
-from tempest.test import skip_because
+from tempest import test
CONF = config.CONF
@@ -53,7 +52,7 @@
@testtools.skipUnless(CONF.compute_feature_enabled.change_password,
'Change password not available.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_change_server_password(self):
# The server's password should be set to the provided password
new_password = 'Newpass1234'
@@ -64,16 +63,18 @@
if self.run_ssh:
# Verify that the user can authenticate with the new password
resp, server = self.client.get_server(self.server_id)
- linux_client = RemoteClient(server, self.ssh_user, new_password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ new_password)
linux_client.validate_authentication()
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_reboot_server_hard(self):
# The server should be power cycled
if self.run_ssh:
# Get the time the server was last rebooted,
resp, server = self.client.get_server(self.server_id)
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
boot_time = linux_client.get_boot_time()
resp, body = self.client.reboot(self.server_id, 'HARD')
@@ -82,19 +83,21 @@
if self.run_ssh:
# Log in and verify the boot time has changed
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
new_boot_time = linux_client.get_boot_time()
self.assertTrue(new_boot_time > boot_time,
'%s > %s' % (new_boot_time, boot_time))
- @skip_because(bug="1014647")
- @attr(type='smoke')
+ @test.skip_because(bug="1014647")
+ @test.attr(type='smoke')
def test_reboot_server_soft(self):
# The server should be signaled to reboot gracefully
if self.run_ssh:
# Get the time the server was last rebooted,
resp, server = self.client.get_server(self.server_id)
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
boot_time = linux_client.get_boot_time()
resp, body = self.client.reboot(self.server_id, 'SOFT')
@@ -103,12 +106,13 @@
if self.run_ssh:
# Log in and verify the boot time has changed
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
new_boot_time = linux_client.get_boot_time()
self.assertTrue(new_boot_time > boot_time,
'%s > %s' % (new_boot_time, boot_time))
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_rebuild_server(self):
# The server should be rebuilt using the provided image and data
meta = {'rebuild': 'server'}
@@ -140,10 +144,11 @@
if self.run_ssh:
# Verify that the user can authenticate with the provided password
- linux_client = RemoteClient(server, self.ssh_user, password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ password)
linux_client.validate_authentication()
- @attr(type='gate')
+ @test.attr(type='gate')
def test_rebuild_server_in_stop_state(self):
# The server in stop state should be rebuilt using the provided
# image and remain in SHUTOFF state
@@ -181,7 +186,7 @@
return current_flavor, new_flavor_ref
@testtools.skipIf(not resize_available, 'Resize not available.')
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_resize_server_confirm(self):
# The server's RAM and disk space should be modified to that of
# the provided flavor
@@ -200,7 +205,7 @@
self.assertEqual(new_flavor_ref, server['flavor']['id'])
@testtools.skipIf(not resize_available, 'Resize not available.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_resize_server_revert(self):
# The server's RAM and disk space should return to its original
# values after a resize is reverted
@@ -228,7 +233,7 @@
required time (%s s).' % (self.server_id, self.build_timeout)
raise exceptions.TimeoutException(message)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_create_backup(self):
# Positive test:create backup successfully and rotate backups correctly
# create the first and the second backup
@@ -313,7 +318,7 @@
lines = len(output.split('\n'))
self.assertEqual(lines, 10)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_get_console_output(self):
# Positive test:Should be able to GET the console output
# for a given server_id and number of lines
@@ -329,7 +334,7 @@
self.wait_for(self._get_output)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_get_console_output_server_id_in_shutoff_status(self):
# Positive test:Should be able to GET the console output
# for a given server_id in SHUTOFF status
@@ -346,7 +351,7 @@
self.wait_for(self._get_output)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_pause_unpause_server(self):
resp, server = self.client.pause_server(self.server_id)
self.assertEqual(202, resp.status)
@@ -355,7 +360,7 @@
self.assertEqual(202, resp.status)
self.client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_suspend_resume_server(self):
resp, server = self.client.suspend_server(self.server_id)
self.assertEqual(202, resp.status)
@@ -364,7 +369,7 @@
self.assertEqual(202, resp.status)
self.client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_shelve_unshelve_server(self):
resp, server = self.client.shelve_server(self.server_id)
self.assertEqual(202, resp.status)
@@ -389,7 +394,7 @@
self.assertEqual(202, resp.status)
self.client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_stop_start_server(self):
resp, server = self.servers_client.stop(self.server_id)
self.assertEqual(202, resp.status)
@@ -398,7 +403,7 @@
self.assertEqual(202, resp.status)
self.servers_client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_lock_unlock_server(self):
# Lock the server,try server stop(exceptions throw),unlock it and retry
resp, server = self.servers_client.lock_server(self.server_id)
diff --git a/tempest/api/compute/v3/servers/test_attach_volume.py b/tempest/api/compute/v3/servers/test_attach_volume.py
index d693be5..2edf934 100644
--- a/tempest/api/compute/v3/servers/test_attach_volume.py
+++ b/tempest/api/compute/v3/servers/test_attach_volume.py
@@ -16,9 +16,9 @@
import testtools
from tempest.api.compute import base
-from tempest.common.utils.linux.remote_client import RemoteClient
+from tempest.common.utils.linux import remote_client
from tempest import config
-from tempest.test import attr
+from tempest import test
CONF = config.CONF
@@ -78,7 +78,7 @@
self.addCleanup(self._detach, server['id'], volume['id'])
@testtools.skipIf(not run_ssh, 'SSH required for this test')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_attach_detach_volume(self):
# Stop and Start a server with an attached volume, ensuring that
# the volume remains attached.
@@ -92,9 +92,8 @@
self.servers_client.start(server['id'])
self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
- linux_client = RemoteClient(server,
- self.image_ssh_user,
- server['admin_password'])
+ linux_client = remote_client.RemoteClient(server, self.image_ssh_user,
+ server['admin_password'])
partitions = linux_client.get_partitions()
self.assertIn(self.device, partitions)
@@ -107,8 +106,7 @@
self.servers_client.start(server['id'])
self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
- linux_client = RemoteClient(server,
- self.image_ssh_user,
- server['admin_password'])
+ linux_client = remote_client.RemoteClient(server, self.image_ssh_user,
+ server['admin_password'])
partitions = linux_client.get_partitions()
self.assertNotIn(self.device, partitions)
diff --git a/tempest/api/compute/v3/servers/test_create_server.py b/tempest/api/compute/v3/servers/test_create_server.py
index 7a4c877..0825381 100644
--- a/tempest/api/compute/v3/servers/test_create_server.py
+++ b/tempest/api/compute/v3/servers/test_create_server.py
@@ -20,7 +20,7 @@
from tempest.api.compute import base
from tempest.common.utils import data_utils
-from tempest.common.utils.linux.remote_client import RemoteClient
+from tempest.common.utils.linux import remote_client
from tempest import config
from tempest import test
@@ -95,7 +95,8 @@
@test.attr(type='gate')
def test_can_log_into_created_server(self):
# Check that the user can authenticate with the generated password
- linux_client = RemoteClient(self.server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(self.server,
+ self.ssh_user, self.password)
self.assertTrue(linux_client.can_authenticate())
@testtools.skipIf(not run_ssh, 'Instance validation tests are disabled.')
@@ -104,14 +105,16 @@
# Verify that the number of vcpus reported by the instance matches
# the amount stated by the flavor
resp, flavor = self.flavors_client.get_flavor_details(self.flavor_ref)
- linux_client = RemoteClient(self.server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(self.server,
+ self.ssh_user, self.password)
self.assertEqual(flavor['vcpus'], linux_client.get_number_of_vcpus())
@testtools.skipIf(not run_ssh, 'Instance validation tests are disabled.')
@test.attr(type='gate')
def test_host_name_is_same_as_server_name(self):
# Verify the instance host name is the same as the server name
- linux_client = RemoteClient(self.server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(self.server,
+ self.ssh_user, self.password)
self.assertTrue(linux_client.hostname_equals_servername(self.name))
@@ -204,12 +207,12 @@
adminPass=admin_pass,
flavor=flavor_with_eph_disk_id))
# Get partition number of server without extra specs.
- linux_client = RemoteClient(server_no_eph_disk,
- self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server_no_eph_disk,
+ self.ssh_user, self.password)
partition_num = len(linux_client.get_partitions())
- linux_client = RemoteClient(server_with_eph_disk,
- self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server_with_eph_disk,
+ self.ssh_user, self.password)
self.assertEqual(partition_num + 1, linux_client.get_partitions())
diff --git a/tempest/api/compute/v3/servers/test_list_server_filters.py b/tempest/api/compute/v3/servers/test_list_server_filters.py
index 9082eda..e08125b 100644
--- a/tempest/api/compute/v3/servers/test_list_server_filters.py
+++ b/tempest/api/compute/v3/servers/test_list_server_filters.py
@@ -18,8 +18,7 @@
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
-from tempest.test import attr
-from tempest.test import skip_because
+from tempest import test
CONF = config.CONF
@@ -74,7 +73,7 @@
cls.fixed_network_name = CONF.compute.fixed_network_name
@utils.skip_unless_attr('multiple_images', 'Only one image found')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_image(self):
# Filter the list of servers by image
params = {'image': self.image_ref}
@@ -85,7 +84,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_flavor(self):
# Filter the list of servers by flavor
params = {'flavor': self.flavor_ref_alt}
@@ -96,7 +95,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_server_name(self):
# Filter the list of servers by server name
params = {'name': self.s1_name}
@@ -107,7 +106,7 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_server_status(self):
# Filter the list of servers by server status
params = {'status': 'active'}
@@ -118,21 +117,21 @@
self.assertIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_limit(self):
# Verify only the expected number of servers are returned
params = {'limit': 1}
resp, servers = self.client.list_servers(params)
self.assertEqual(1, len([x for x in servers['servers'] if 'id' in x]))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_zero_limit(self):
# Verify only the expected number of servers are returned
params = {'limit': 0}
resp, servers = self.client.list_servers(params)
self.assertEqual(0, len(servers['servers']))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_exceed_limit(self):
# Verify only the expected number of servers are returned
params = {'limit': 100000}
@@ -142,7 +141,7 @@
len([x for x in servers['servers'] if 'id' in x]))
@utils.skip_unless_attr('multiple_images', 'Only one image found')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_image(self):
# Filter the detailed list of servers by image
params = {'image': self.image_ref}
@@ -153,7 +152,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_flavor(self):
# Filter the detailed list of servers by flavor
params = {'flavor': self.flavor_ref_alt}
@@ -164,7 +163,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_server_name(self):
# Filter the detailed list of servers by server name
params = {'name': self.s1_name}
@@ -175,7 +174,7 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_filter_by_server_status(self):
# Filter the detailed list of servers by server status
params = {'status': 'active'}
@@ -188,7 +187,7 @@
self.assertIn(self.s3['id'], map(lambda x: x['id'], servers))
self.assertEqual(['ACTIVE'] * 3, [x['status'] for x in servers])
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filter_by_shutoff_status(self):
# Filter the list of servers by server shutoff status
params = {'status': 'shutoff'}
@@ -205,7 +204,7 @@
self.assertNotIn(self.s2['id'], map(lambda x: x['id'], servers))
self.assertNotIn(self.s3['id'], map(lambda x: x['id'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_filtered_by_name_wildcard(self):
# List all servers that contains '-instance' in name
params = {'name': '-instance'}
@@ -227,8 +226,8 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @skip_because(bug="1170718")
- @attr(type='gate')
+ @test.skip_because(bug="1170718")
+ @test.attr(type='gate')
def test_list_servers_filtered_by_ip(self):
# Filter servers by ip
# Here should be listed 1 server
@@ -242,9 +241,9 @@
self.assertNotIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertNotIn(self.s3_name, map(lambda x: x['name'], servers))
- @skip_because(bug="1182883",
- condition=CONF.service_available.neutron)
- @attr(type='gate')
+ @test.skip_because(bug="1182883",
+ condition=CONF.service_available.neutron)
+ @test.attr(type='gate')
def test_list_servers_filtered_by_ip_regex(self):
# Filter servers by regex ip
# List all servers filtered by part of ip address.
@@ -259,7 +258,7 @@
self.assertIn(self.s2_name, map(lambda x: x['name'], servers))
self.assertIn(self.s3_name, map(lambda x: x['name'], servers))
- @attr(type='gate')
+ @test.attr(type='gate')
def test_list_servers_detailed_limit_results(self):
# Verify only the expected number of detailed results are returned
params = {'limit': 1}
diff --git a/tempest/api/compute/v3/servers/test_server_actions.py b/tempest/api/compute/v3/servers/test_server_actions.py
index 0dae796..6584b93 100644
--- a/tempest/api/compute/v3/servers/test_server_actions.py
+++ b/tempest/api/compute/v3/servers/test_server_actions.py
@@ -19,11 +19,10 @@
from tempest.api.compute import base
from tempest.common.utils import data_utils
-from tempest.common.utils.linux.remote_client import RemoteClient
+from tempest.common.utils.linux import remote_client
from tempest import config
from tempest import exceptions
-from tempest.test import attr
-from tempest.test import skip_because
+from tempest import test
CONF = config.CONF
@@ -52,7 +51,7 @@
@testtools.skipUnless(CONF.compute_feature_enabled.change_password,
'Change password not available.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_change_server_password(self):
# The server's password should be set to the provided password
new_password = 'Newpass1234'
@@ -63,16 +62,18 @@
if self.run_ssh:
# Verify that the user can authenticate with the new password
resp, server = self.client.get_server(self.server_id)
- linux_client = RemoteClient(server, self.ssh_user, new_password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ new_password)
linux_client.validate_authentication()
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_reboot_server_hard(self):
# The server should be power cycled
if self.run_ssh:
# Get the time the server was last rebooted,
resp, server = self.client.get_server(self.server_id)
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
boot_time = linux_client.get_boot_time()
resp, body = self.client.reboot(self.server_id, 'HARD')
@@ -81,18 +82,20 @@
if self.run_ssh:
# Log in and verify the boot time has changed
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
new_boot_time = linux_client.get_boot_time()
self.assertGreater(new_boot_time, boot_time)
- @skip_because(bug="1014647")
- @attr(type='smoke')
+ @test.skip_because(bug="1014647")
+ @test.attr(type='smoke')
def test_reboot_server_soft(self):
# The server should be signaled to reboot gracefully
if self.run_ssh:
# Get the time the server was last rebooted,
resp, server = self.client.get_server(self.server_id)
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
boot_time = linux_client.get_boot_time()
resp, body = self.client.reboot(self.server_id, 'SOFT')
@@ -101,11 +104,12 @@
if self.run_ssh:
# Log in and verify the boot time has changed
- linux_client = RemoteClient(server, self.ssh_user, self.password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ self.password)
new_boot_time = linux_client.get_boot_time()
self.assertGreater(new_boot_time, boot_time)
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_rebuild_server(self):
# The server should be rebuilt using the provided image and data
meta = {'rebuild': 'server'}
@@ -133,10 +137,11 @@
if self.run_ssh:
# Verify that the user can authenticate with the provided password
- linux_client = RemoteClient(server, self.ssh_user, password)
+ linux_client = remote_client.RemoteClient(server, self.ssh_user,
+ password)
linux_client.validate_authentication()
- @attr(type='gate')
+ @test.attr(type='gate')
def test_rebuild_server_in_stop_state(self):
# The server in stop state should be rebuilt using the provided
# image and remain in SHUTOFF state
@@ -174,7 +179,7 @@
return current_flavor, new_flavor_ref
@testtools.skipIf(not resize_available, 'Resize not available.')
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_resize_server_confirm(self):
# The server's RAM and disk space should be modified to that of
# the provided flavor
@@ -193,7 +198,7 @@
self.assertEqual(new_flavor_ref, server['flavor']['id'])
@testtools.skipIf(not resize_available, 'Resize not available.')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_resize_server_revert(self):
# The server's RAM and disk space should return to its original
# values after a resize is reverted
@@ -221,7 +226,7 @@
required time (%s s).' % (self.server_id, self.build_timeout)
raise exceptions.TimeoutException(message)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_create_backup(self):
# Positive test:create backup successfully and rotate backups correctly
# create the first and the second backup
@@ -303,7 +308,7 @@
lines = len(output.split('\n'))
self.assertEqual(lines, 10)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_get_console_output(self):
# Positive test:Should be able to GET the console output
# for a given server_id and number of lines
@@ -319,7 +324,7 @@
self.wait_for(self._get_output)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_get_console_output_server_id_in_shutoff_status(self):
# Positive test:Should be able to GET the console output
# for a given server_id in SHUTOFF status
@@ -336,7 +341,7 @@
self.wait_for(self._get_output)
- @attr(type='gate')
+ @test.attr(type='gate')
def test_pause_unpause_server(self):
resp, server = self.client.pause_server(self.server_id)
self.assertEqual(202, resp.status)
@@ -345,7 +350,7 @@
self.assertEqual(202, resp.status)
self.client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_suspend_resume_server(self):
resp, server = self.client.suspend_server(self.server_id)
self.assertEqual(202, resp.status)
@@ -354,7 +359,7 @@
self.assertEqual(202, resp.status)
self.client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_shelve_unshelve_server(self):
resp, server = self.client.shelve_server(self.server_id)
self.assertEqual(202, resp.status)
@@ -378,7 +383,7 @@
self.assertEqual(202, resp.status)
self.client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_stop_start_server(self):
resp, server = self.servers_client.stop(self.server_id)
self.assertEqual(202, resp.status)
@@ -387,7 +392,7 @@
self.assertEqual(202, resp.status)
self.servers_client.wait_for_server_status(self.server_id, 'ACTIVE')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_lock_unlock_server(self):
# Lock the server,try server stop(exceptions throw),unlock it and retry
resp, server = self.servers_client.lock_server(self.server_id)
diff --git a/tempest/api/compute/volumes/test_attach_volume.py b/tempest/api/compute/volumes/test_attach_volume.py
index 8d8e3ec..7a60196 100644
--- a/tempest/api/compute/volumes/test_attach_volume.py
+++ b/tempest/api/compute/volumes/test_attach_volume.py
@@ -16,9 +16,9 @@
import testtools
from tempest.api.compute import base
-from tempest.common.utils.linux.remote_client import RemoteClient
+from tempest.common.utils.linux import remote_client
from tempest import config
-from tempest.test import attr
+from tempest import test
CONF = config.CONF
@@ -78,7 +78,7 @@
self.addCleanup(self._detach, server['id'], volume['id'])
@testtools.skipIf(not run_ssh, 'SSH required for this test')
- @attr(type='gate')
+ @test.attr(type='gate')
def test_attach_detach_volume(self):
# Stop and Start a server with an attached volume, ensuring that
# the volume remains attached.
@@ -92,8 +92,8 @@
self.servers_client.start(server['id'])
self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
- linux_client = RemoteClient(server,
- self.image_ssh_user, server['adminPass'])
+ linux_client = remote_client.RemoteClient(server, self.image_ssh_user,
+ server['adminPass'])
partitions = linux_client.get_partitions()
self.assertIn(self.device, partitions)
@@ -106,8 +106,8 @@
self.servers_client.start(server['id'])
self.servers_client.wait_for_server_status(server['id'], 'ACTIVE')
- linux_client = RemoteClient(server,
- self.image_ssh_user, server['adminPass'])
+ linux_client = remote_client.RemoteClient(server, self.image_ssh_user,
+ server['adminPass'])
partitions = linux_client.get_partitions()
self.assertNotIn(self.device, partitions)
diff --git a/tempest/api/compute/volumes/test_volumes_get.py b/tempest/api/compute/volumes/test_volumes_get.py
index bcab891..73e3b3a 100644
--- a/tempest/api/compute/volumes/test_volumes_get.py
+++ b/tempest/api/compute/volumes/test_volumes_get.py
@@ -16,8 +16,8 @@
from tempest.api.compute import base
from tempest.common.utils import data_utils
from tempest import config
-from tempest.test import attr
-from testtools.matchers import ContainsAll
+from tempest import test
+from testtools import matchers
CONF = config.CONF
@@ -34,7 +34,7 @@
skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
raise cls.skipException(skip_msg)
- @attr(type='smoke')
+ @test.attr(type='smoke')
def test_volume_create_get_delete(self):
# CREATE, GET, DELETE Volume
volume = None
@@ -68,7 +68,7 @@
'The fetched Volume is different '
'from the created Volume')
self.assertThat(fetched_volume['metadata'].items(),
- ContainsAll(metadata.items()),
+ matchers.ContainsAll(metadata.items()),
'The fetched Volume metadata misses data '
'from the created Volume')
diff --git a/tempest/common/utils/linux/remote_client.py b/tempest/common/utils/linux/remote_client.py
index bb2fcfb..94fc23c 100644
--- a/tempest/common/utils/linux/remote_client.py
+++ b/tempest/common/utils/linux/remote_client.py
@@ -13,9 +13,9 @@
import re
import time
-from tempest.common.ssh import Client
+from tempest.common import ssh
from tempest import config
-from tempest.exceptions import ServerUnreachable
+from tempest import exceptions
CONF = config.CONF
@@ -37,10 +37,10 @@
ip_address = address['addr']
break
else:
- raise ServerUnreachable()
- self.ssh_client = Client(ip_address, username, password, ssh_timeout,
- pkey=pkey,
- channel_timeout=ssh_channel_timeout)
+ raise exceptions.ServerUnreachable()
+ self.ssh_client = ssh.Client(ip_address, username, password,
+ ssh_timeout, pkey=pkey,
+ channel_timeout=ssh_channel_timeout)
def validate_authentication(self):
"""Validate ssh connection and authentication
diff --git a/tempest/services/network/xml/network_client.py b/tempest/services/network/xml/network_client.py
index f6ae718..25e6edb 100644
--- a/tempest/services/network/xml/network_client.py
+++ b/tempest/services/network/xml/network_client.py
@@ -14,11 +14,7 @@
import xml.etree.ElementTree as ET
from tempest.common import rest_client
-from tempest.services.compute.xml.common import deep_dict_to_xml
-from tempest.services.compute.xml.common import Document
-from tempest.services.compute.xml.common import Element
-from tempest.services.compute.xml.common import parse_array
-from tempest.services.compute.xml.common import xml_to_json
+from tempest.services.compute.xml import common
from tempest.services.network import network_client_base as client_base
@@ -37,11 +33,11 @@
def _parse_array(self, node):
array = []
for child in node.getchildren():
- array.append(xml_to_json(child))
+ array.append(common.xml_to_json(child))
return array
def deserialize_list(self, body):
- return parse_array(etree.fromstring(body), self.PLURALS)
+ return common.parse_array(etree.fromstring(body), self.PLURALS)
def deserialize_single(self, body):
return _root_tag_fetcher_and_xml_to_json_parse(body)
@@ -50,91 +46,91 @@
#TODO(enikanorov): implement better json to xml conversion
# expecting the dict with single key
root = body.keys()[0]
- post_body = Element(root)
+ post_body = common.Element(root)
post_body.add_attr('xmlns:xsi',
'http://www.w3.org/2001/XMLSchema-instance')
for name, attr in body[root].items():
elt = self._get_element(name, attr)
post_body.append(elt)
- return str(Document(post_body))
+ return str(common.Document(post_body))
def serialize_list(self, body, root_name=None, item_name=None):
# expecting dict in form
# body = {'resources': [res_dict1, res_dict2, ...]
- post_body = Element(root_name)
+ post_body = common.Element(root_name)
post_body.add_attr('xmlns:xsi',
'http://www.w3.org/2001/XMLSchema-instance')
for item in body[body.keys()[0]]:
- elt = Element(item_name)
+ elt = common.Element(item_name)
for name, attr in item.items():
elt_content = self._get_element(name, attr)
elt.append(elt_content)
post_body.append(elt)
- return str(Document(post_body))
+ return str(common.Document(post_body))
def _get_element(self, name, value):
if value is None:
- xml_elem = Element(name)
+ xml_elem = common.Element(name)
xml_elem.add_attr("xsi:nil", "true")
return xml_elem
elif isinstance(value, dict):
- dict_element = Element(name)
+ dict_element = common.Element(name)
for key, value in value.iteritems():
elem = self._get_element(key, value)
dict_element.append(elem)
return dict_element
elif isinstance(value, list):
- list_element = Element(name)
+ list_element = common.Element(name)
for element in value:
elem = self._get_element(name[:-1], element)
list_element.append(elem)
return list_element
else:
- return Element(name, value)
+ return common.Element(name, value)
def create_security_group(self, name):
uri = '%s/security-groups' % (self.uri_prefix)
- post_body = Element("security_group")
- p2 = Element("name", name)
+ post_body = common.Element("security_group")
+ p2 = common.Element("name", name)
post_body.append(p2)
- resp, body = self.post(uri, str(Document(post_body)))
+ resp, body = self.post(uri, str(common.Document(post_body)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def create_security_group_rule(self, secgroup_id,
direction='ingress', **kwargs):
uri = '%s/security-group-rules' % (self.uri_prefix)
- rule = Element("security_group_rule")
- p1 = Element('security_group_id', secgroup_id)
- p2 = Element('direction', direction)
+ rule = common.Element("security_group_rule")
+ p1 = common.Element('security_group_id', secgroup_id)
+ p2 = common.Element('direction', direction)
rule.append(p1)
rule.append(p2)
for key, val in kwargs.items():
- key = Element(key, val)
+ key = common.Element(key, val)
rule.append(key)
- resp, body = self.post(uri, str(Document(rule)))
+ resp, body = self.post(uri, str(common.Document(rule)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def create_member(self, address, protocol_port, pool_id):
uri = '%s/lb/members' % (self.uri_prefix)
- post_body = Element("member")
- p1 = Element("address", address)
- p2 = Element("protocol_port", protocol_port)
- p3 = Element("pool_id", pool_id)
+ post_body = common.Element("member")
+ p1 = common.Element("address", address)
+ p2 = common.Element("protocol_port", protocol_port)
+ p3 = common.Element("pool_id", pool_id)
post_body.append(p1)
post_body.append(p2)
post_body.append(p3)
- resp, body = self.post(uri, str(Document(post_body)))
+ resp, body = self.post(uri, str(common.Document(post_body)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def update_member(self, admin_state_up, member_id):
uri = '%s/lb/members/%s' % (self.uri_prefix, str(member_id))
- put_body = Element("member")
- p2 = Element("admin_state_up", admin_state_up)
+ put_body = common.Element("member")
+ p2 = common.Element("admin_state_up", admin_state_up)
put_body.append(p2)
- resp, body = self.put(uri, str(Document(put_body)))
+ resp, body = self.put(uri, str(common.Document(put_body)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
@@ -142,10 +138,10 @@
pool_id):
uri = '%s/lb/pools/%s/health_monitors' % (self.uri_prefix,
pool_id)
- post_body = Element("health_monitor")
- p1 = Element("id", health_monitor_id,)
+ post_body = common.Element("health_monitor")
+ p1 = common.Element("id", health_monitor_id,)
post_body.append(p1)
- resp, body = self.post(uri, str(Document(post_body)))
+ resp, body = self.post(uri, str(common.Document(post_body)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
@@ -163,101 +159,102 @@
def create_router(self, name, **kwargs):
uri = '%s/routers' % (self.uri_prefix)
- router = Element("router")
- router.append(Element("name", name))
- deep_dict_to_xml(router, kwargs)
- resp, body = self.post(uri, str(Document(router)))
+ router = common.Element("router")
+ router.append(common.Element("name", name))
+ common.deep_dict_to_xml(router, kwargs)
+ resp, body = self.post(uri, str(common.Document(router)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def update_router(self, router_id, **kwargs):
uri = '%s/routers/%s' % (self.uri_prefix, router_id)
- router = Element("router")
+ router = common.Element("router")
for element, content in kwargs.iteritems():
- router.append(Element(element, content))
- resp, body = self.put(uri, str(Document(router)))
+ router.append(common.Element(element, content))
+ resp, body = self.put(uri, str(common.Document(router)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def add_router_interface_with_subnet_id(self, router_id, subnet_id):
uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix,
router_id)
- subnet = Element("subnet_id", subnet_id)
- resp, body = self.put(uri, str(Document(subnet)))
+ subnet = common.Element("subnet_id", subnet_id)
+ resp, body = self.put(uri, str(common.Document(subnet)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def add_router_interface_with_port_id(self, router_id, port_id):
uri = '%s/routers/%s/add_router_interface' % (self.uri_prefix,
router_id)
- port = Element("port_id", port_id)
- resp, body = self.put(uri, str(Document(port)))
+ port = common.Element("port_id", port_id)
+ resp, body = self.put(uri, str(common.Document(port)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def remove_router_interface_with_subnet_id(self, router_id, subnet_id):
uri = '%s/routers/%s/remove_router_interface' % (self.uri_prefix,
router_id)
- subnet = Element("subnet_id", subnet_id)
- resp, body = self.put(uri, str(Document(subnet)))
+ subnet = common.Element("subnet_id", subnet_id)
+ resp, body = self.put(uri, str(common.Document(subnet)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def remove_router_interface_with_port_id(self, router_id, port_id):
uri = '%s/routers/%s/remove_router_interface' % (self.uri_prefix,
router_id)
- port = Element("port_id", port_id)
- resp, body = self.put(uri, str(Document(port)))
+ port = common.Element("port_id", port_id)
+ resp, body = self.put(uri, str(common.Document(port)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def create_floating_ip(self, ext_network_id, **kwargs):
uri = '%s/floatingips' % (self.uri_prefix)
- floatingip = Element('floatingip')
- floatingip.append(Element("floating_network_id", ext_network_id))
+ floatingip = common.Element('floatingip')
+ floatingip.append(common.Element("floating_network_id",
+ ext_network_id))
for element, content in kwargs.iteritems():
- floatingip.append(Element(element, content))
- resp, body = self.post(uri, str(Document(floatingip)))
+ floatingip.append(common.Element(element, content))
+ resp, body = self.post(uri, str(common.Document(floatingip)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def update_floating_ip(self, floating_ip_id, **kwargs):
uri = '%s/floatingips/%s' % (self.uri_prefix, floating_ip_id)
- floatingip = Element('floatingip')
+ floatingip = common.Element('floatingip')
floatingip.add_attr('xmlns:xsi',
'http://www.w3.org/2001/XMLSchema-instance')
for element, content in kwargs.iteritems():
if content is None:
- xml_elem = Element(element)
+ xml_elem = common.Element(element)
xml_elem.add_attr("xsi:nil", "true")
floatingip.append(xml_elem)
else:
- floatingip.append(Element(element, content))
- resp, body = self.put(uri, str(Document(floatingip)))
+ floatingip.append(common.Element(element, content))
+ resp, body = self.put(uri, str(common.Document(floatingip)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def list_router_interfaces(self, uuid):
uri = '%s/ports?device_id=%s' % (self.uri_prefix, uuid)
resp, body = self.get(uri)
- ports = parse_array(etree.fromstring(body), self.PLURALS)
+ ports = common.parse_array(etree.fromstring(body), self.PLURALS)
ports = {"ports": ports}
return resp, ports
def update_agent(self, agent_id, agent_info):
uri = '%s/agents/%s' % (self.uri_prefix, agent_id)
- agent = Element('agent')
+ agent = common.Element('agent')
for (key, value) in agent_info.items():
- p = Element(key, value)
+ p = common.Element(key, value)
agent.append(p)
- resp, body = self.put(uri, str(Document(agent)))
+ resp, body = self.put(uri, str(common.Document(agent)))
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
def list_pools_hosted_by_one_lbaas_agent(self, agent_id):
uri = '%s/agents/%s/loadbalancer-pools' % (self.uri_prefix, agent_id)
resp, body = self.get(uri)
- pools = parse_array(etree.fromstring(body))
+ pools = common.parse_array(etree.fromstring(body))
body = {'pools': pools}
return resp, body
@@ -283,14 +280,14 @@
def list_dhcp_agent_hosting_network(self, network_id):
uri = '%s/networks/%s/dhcp-agents' % (self.uri_prefix, network_id)
resp, body = self.get(uri)
- agents = parse_array(etree.fromstring(body))
+ agents = common.parse_array(etree.fromstring(body))
body = {'agents': agents}
return resp, body
def list_networks_hosted_by_one_dhcp_agent(self, agent_id):
uri = '%s/agents/%s/dhcp-networks' % (self.uri_prefix, agent_id)
resp, body = self.get(uri)
- networks = parse_array(etree.fromstring(body))
+ networks = common.parse_array(etree.fromstring(body))
body = {'networks': networks}
return resp, body
@@ -312,8 +309,8 @@
root_tag = body.tag
if root_tag.startswith("{"):
ns, root_tag = root_tag.split("}", 1)
- body = xml_to_json(etree.fromstring(xml_returned_body),
- NetworkClientXML.PLURALS)
+ body = common.xml_to_json(etree.fromstring(xml_returned_body),
+ NetworkClientXML.PLURALS)
nil = '{http://www.w3.org/2001/XMLSchema-instance}nil'
for key, val in body.iteritems():
if isinstance(val, dict):
diff --git a/tempest/stress/run_stress.py b/tempest/stress/run_stress.py
index a6c2b77..c7c17c0 100755
--- a/tempest/stress/run_stress.py
+++ b/tempest/stress/run_stress.py
@@ -18,7 +18,7 @@
import inspect
import json
import sys
-from testtools.testsuite import iterate_tests
+from testtools import testsuite
try:
from unittest import loader
except ImportError:
@@ -38,7 +38,7 @@
tests = []
testloader = loader.TestLoader()
list = testloader.discover(path)
- for func in (iterate_tests(list)):
+ for func in (testsuite.iterate_tests(list)):
attrs = []
try:
method_name = getattr(func, '_testMethodName')