Update fio helpers

- Create / delete fio VMs in parallel. VMs are to be created in batches, each batch
parallelism size is controlled by a new config parameter
- A server group with 'soft-anti-affinity' policy is added to fio setup.
This way VMs will be automatically scheduled on different hosts

Related-PROD: PROD-37187
Change-Id: I0a22b1a1fe279966e8370605d84783c5c49fea50
diff --git a/fio/clouds.yaml b/fio/clouds.yaml
index 7964385..d373e5a 100644
--- a/fio/clouds.yaml
+++ b/fio/clouds.yaml
@@ -31,6 +31,7 @@
       keypair_name: "fio-key"
       keypair_file_location: "."
       fio_client_name_mask: "fio-vm"
+      fio_aa_group_name: "fio-anti-affinity-group"
       fio_flavor_name: "fio-flavor"
       fio_flavor_ram: 2048
       fio_flavor_cpus: 10
@@ -42,4 +43,5 @@
       fio_vol_mountpoint: "/dev/vdc"
       mtu_size: 8000
       hv_suffix: "kaas-kubernetes-XXX"
-      cloud_name: "cloud-XXX"
+      cloud_name: "cloud-lon-XXX"
+      concurrency: 5
diff --git a/fio/connection.py b/fio/connection.py
index 4007ebe..6d82179 100644
--- a/fio/connection.py
+++ b/fio/connection.py
@@ -27,7 +27,7 @@
 UBUNTU_IMAGE_NAME: Final[Any] = get_resource_value(
     'ubuntu_image_name', 'Ubuntu-18.04')
 FIO_SG_NAME: Final[Any] = get_resource_value('sg_name', 'fio-sg')
-FIO_KEYPAIR_NAME: Final[Union[int, str]] = "-".join(
+FIO_KEYPAIR_NAME: Final[str] = "-".join(
     [get_resource_value('keypair_name', 'fio-key'), CLOUD_NAME])
 PRIVATE_KEYPAIR_FILE: Final[str] = "{}/{}.pem".format(
     get_resource_value('keypair_file_location', '.'),
@@ -49,7 +49,7 @@
 FIO_FLAVOR_RAM: Final[Any] = get_resource_value('fio_flavor_ram', 2048)
 FIO_FLAVOR_CPUS: Final[Any] = get_resource_value('fio_flavor_cpus', 10)
 FIO_FLAVOR_DISK: Final[Any] = get_resource_value('fio_flavor_disk', 20)
-FIO_CLIENTS_COUNT: Final[Any] = int(
+FIO_CLIENTS_COUNT: Final[int] = int(
     get_resource_value('fio_clients_count', 10))
 FIO_VOL_NAME_MASK: Final[Any] = get_resource_value(
     'fio_vol_name_mask', 'fio-vol')
@@ -60,8 +60,11 @@
     'fio_vol_mountpoint', '/dev/vdc')
 FIO_CLIENT_NAME_MASK: Final[Any] = get_resource_value(
     'fio_client_name_mask', 'fio-vm')
+FIO_AA_SERVER_GROUP_NAME: Final[Any] = get_resource_value(
+    'fio_aa_group_name', 'fio-anti-affinity-group')
 
 HV_SUFFIX: Final[Any] = get_resource_value('hv_suffix', '')
+CONCURRENCY: Final[int] = int(get_resource_value('concurrency', 5))
 
 
 def delete_server(srv: openstack.compute.v2.server.Server) -> None:
@@ -102,6 +105,7 @@
     print(FIO_FLAVOR_DISK)
     print(FIO_CLIENTS_COUNT)
     print(FIO_CLIENT_NAME_MASK)
+    print(FIO_AA_SERVER_GROUP_NAME)
     print(FIO_VOL_NAME_MASK)
     print(FIO_VOL_SIZE)
     print(FIO_VOL_TYPE)
@@ -109,3 +113,4 @@
 
     print(HV_SUFFIX)
     print(CLOUD_NAME)
+    print(CONCURRENCY)
diff --git a/fio/fio_cleanup.py b/fio/fio_cleanup.py
index ccbe38e..753df87 100644
--- a/fio/fio_cleanup.py
+++ b/fio/fio_cleanup.py
@@ -1,3 +1,5 @@
+import multiprocessing as mp
+
 import connection as conn
 from openstack.exceptions import ResourceFailure
 from typing import Final
@@ -8,40 +10,50 @@
 volume = conn.cloud.volume
 
 CLIENT_NAME_MASK: Final[str] = conn.FIO_CLIENT_NAME_MASK
+AA_SERVER_GROUP_NAME: Final[str] = conn.FIO_AA_SERVER_GROUP_NAME
 FLAVOR_NAME: Final[str] = conn.FIO_FLAVOR_NAME
 KEYPAIR_NAME: Final[str] = conn.FIO_KEYPAIR_NAME
 SG_NAME: Final[str] = conn.FIO_SG_NAME
 
 ROUTER_NAME: Final[str] = conn.FIO_ROUTER_NAME
 NET_NAME: Final[str] = conn.FIO_NET_NAME
+CONCURRENCY: Final[int] = conn.CONCURRENCY
+
+
+def delete_fio_client(vm_id: str) -> None:
+    vm = compute.get_server(vm_id)
+    attachments = compute.volume_attachments(vm)
+    # Delete fio volume attachment (and any other attachments
+    # that the VM could have)
+    # Delete the volume and the server
+    for att in attachments:
+        vol_id = att.volume_id
+        vol = volume.get_volume(vol_id)
+        try:
+            conn.detach_volume(vm, vol)
+            print(
+                f"'{vol.id}' volume has been detached from fio '{vm.name}'"
+                " server.")
+            conn.delete_volume(vol)
+            print(f"'{vol.id}' volume has been deleted.")
+            conn.delete_server(vm)
+            print(f"'{vm.name}' server has been deleted.")
+        except ResourceFailure as e:
+            print(
+                f"Cleanup of '{vm.id}' with volume '{vol.id}' attached "
+                f"failed with '{e.message}' error.")
+            conn.delete_volume(vol)
 
 
 if __name__ == "__main__":
-    # Find fio clients and server
-    vms = compute.servers(name=CLIENT_NAME_MASK)
-    for vm in vms:
-        attachments = compute.volume_attachments(vm)
-        # Delete fio volume attachment (and any other attachments
-        # that the VM could have)
-        # Delete the volume and the server
-        for att in attachments:
-            vol_id = att.volume_id
-            vol = volume.get_volume(vol_id)
-            try:
-                conn.detach_volume(vm, vol)
-                print(
-                    f"'{vol.id}' volume has been detached from fio '{vm.name}'"
-                    " server.")
-                conn.delete_volume(vol)
-                print(f"'{vol.id}' volume has been deleted.")
-                conn.delete_server(vm)
-                print(f"'{vm.name}' server has been deleted.")
-            except ResourceFailure as e:
-                print(
-                    f"Cleanup of '{vm.id}' with volume '{vol.id}' attached "
-                    f"failed with '{e.message}' error.")
-                conn.delete_volume(vol)
-                continue
+    # Find fio VMs
+    vms = list(compute.servers(name=CLIENT_NAME_MASK, details=False))
+
+    # Delete fio VMs in parallel in batches of CONCURRENCY size
+    with mp.Pool(processes=CONCURRENCY) as pool:
+        results = [pool.apply_async(delete_fio_client, (vm.id,)) for vm in vms]
+        # Waits for batch of fio VMs to be deleted
+        _ = [r.get() for r in results]
 
     # Remove ports from fio router (including external GW)
     router = network.find_router(ROUTER_NAME)
@@ -79,3 +91,10 @@
     if sg:
         network.delete_security_group(sg)
         print(f"fio '{sg.id}' security group has been deleted.")
+
+    # Delete fio server group
+    server_group = compute.find_server_group(
+        AA_SERVER_GROUP_NAME, all_projects=True)
+    if server_group:
+        compute.delete_server_group(server_group)
+        print(f"fio '{server_group.name}' server group has been deleted.")
diff --git a/fio/fio_setup.py b/fio/fio_setup.py
index d04b67b..79f1d69 100644
--- a/fio/fio_setup.py
+++ b/fio/fio_setup.py
@@ -1,9 +1,10 @@
+import multiprocessing as mp
 import os
+import random
 import sys
 from typing import Dict, Final, List
 
 import connection as conn
-import openstack
 from openstack.exceptions import ResourceFailure
 
 
@@ -13,6 +14,7 @@
 
 CLIENTS_COUNT: Final[int] = conn.FIO_CLIENTS_COUNT
 CLIENT_NAME_MASK: Final[str] = conn.FIO_CLIENT_NAME_MASK
+AA_SERVER_GROUP_NAME: Final[str] = conn.FIO_AA_SERVER_GROUP_NAME
 UBUNTU_IMAGE_NAME: Final[str] = conn.UBUNTU_IMAGE_NAME
 
 VOL_NAME_MASK: Final[str] = conn.FIO_VOL_NAME_MASK
@@ -39,6 +41,7 @@
 SG_NAME: Final[str] = conn.FIO_SG_NAME
 HV_SUFFIX: Final[str] = conn.HV_SUFFIX
 CLOUD_NAME: Final[str] = conn.CLOUD_NAME
+CONCURRENCY: Final[int] = conn.CONCURRENCY
 
 NODES: Final[List[str]] = []
 SKIP_NODES: Final[List[str]] = []
@@ -69,15 +72,59 @@
 ]
 
 
-def create_server(
-        name, image_id, flavor_id, networks,
-        key_name, security_groups, availability_zone
-) -> openstack.connection.Connection:
-    srv = compute.create_server(
-        name=name, image_id=image_id, flavor_id=flavor_id, networks=networks,
-        key_name=key_name, security_groups=security_groups,
-        availability_zone=availability_zone)
-    return srv
+def create_fio_client(
+        image_id: str, flavor_id: str, networks: List,
+        key_name: str, security_groups: List, server_group_id: str
+) -> None:
+    rand_name = str(random.randint(1, 0x7fffffff))
+    vm_name = f"{CLIENT_NAME_MASK}-{rand_name}"
+
+    vm = compute.create_server(
+        name=vm_name,
+        image_id=image_id,
+        flavor_id=flavor_id,
+        networks=networks,
+        key_name=key_name,
+        security_groups=security_groups,
+        scheduler_hints={'group': server_group_id})
+    try:
+        vm = compute.wait_for_server(vm, wait=180)
+        print(f"Fio client '{vm.name}' is created on '{vm.compute_host}' node")
+    # Stop and exit if any of the servers creation failed (for any reason)
+    except ResourceFailure as e:
+        print(
+            f"Fio client '{vm.name}' creation failed with '{e.message}'"
+            " error.")
+        conn.delete_server(vm)
+        sys.exit(0)
+
+    # Create a volume of the given type
+    vol_name = f"{VOL_NAME_MASK}-{rand_name}"
+    vol = volume.create_volume(
+        name=vol_name, size=VOL_SIZE, volume_type=VOL_TYPE)
+    try:
+        vol = volume.wait_for_status(vol, status='available')
+        print(f"Volume '{vol.name}' is created")
+    # Delete a volume if its creation failed and switch to next
+    # fio client VM
+    except ResourceFailure as e:
+        print(
+            f"Volume '{vol.name}' creation failed with '{e.message}' "
+            "error.")
+        conn.delete_volume(vol)
+
+    # Attach the volume to the fio client
+    compute.create_volume_attachment(vm, volume=vol)
+    try:
+        vol = volume.wait_for_status(vol, status='in-use')
+        print(f"Volume '{vol.name}' is attached to '{vm.name}' fio client")
+    # Delete a volume if attachment failed and switch to next
+    # fio client VM
+    except ResourceFailure as e:
+        print(
+            f"Volume '{vol.name}' attachment failed with '{e.message}' "
+            "error.")
+        conn.delete_volume(vol)
 
 
 if __name__ == "__main__":
@@ -91,7 +138,8 @@
         sys.exit(0)
 
     # Create fio sg if needed
-    sg = network.find_security_group(SG_NAME)
+    project_id = conn.cloud.auth['project_id']
+    sg = network.find_security_group(SG_NAME, project_id=project_id)
     if not sg:
         sg = network.create_security_group(name=SG_NAME)
         # Add 'allow-all' kind of rules to the security group
@@ -146,73 +194,25 @@
         fio_net_port = network.add_interface_to_router(
             router.id, subnet_id=fio_subnet.id)
 
-    # Get list of running computes with enabled 'nova-compute' service
-    cmp_services = compute.services(binary='nova-compute')
-    computes = [s for s in cmp_services if
-                s.host in NODES and
-                s.host not in SKIP_NODES and
-                s.state == 'up' and s.status == 'enabled']
+    # Create fio server group with anti-affinity scheduling policy
+    server_group = compute.find_server_group(
+        AA_SERVER_GROUP_NAME, all_projects=True)
+    if not server_group:
+        server_group = compute.create_server_group(
+            name=AA_SERVER_GROUP_NAME, policies=['soft-anti-affinity'])
 
-    # Prepare list of hypervisors to be used for running fio servers
-    hypervisors = []
-    computes_num = len(computes)
-    for i in range(CLIENTS_COUNT):
-        hypervisors.append(
-            ".".join([computes[i % computes_num].host, HV_SUFFIX]))
+    vm_kwargs = dict(
+        image_id=img.id,
+        flavor_id=flavor.id,
+        networks=[{'uuid': fio_net.id}],
+        key_name=KEYPAIR_NAME,
+        security_groups=[{'name': SG_NAME}],
+        server_group_id=server_group.id)
 
-    # Create <CLIENTS_COUNT> clients, attached to fio private network
-    vms = []
-    for i in range(CLIENTS_COUNT):
-        name = f"{CLIENT_NAME_MASK}{i}"
-        az = f"::{hypervisors[i]}"
-        flavor_id = flavor.id
-        vm = create_server(
-            name=name,
-            image_id=img.id,
-            flavor_id=flavor_id,
-            networks=[{'uuid': fio_net.id}],
-            key_name=KEYPAIR_NAME,
-            security_groups=[{'name': SG_NAME}],
-            availability_zone=az)
-        try:
-            vm = compute.wait_for_server(vm, wait=180)
-            node = hypervisors[i].split('.')[0]
-            print(f"Fio client VM '{vm.name}' is created on '{node}' node")
-        # Stop and exit if any of the servers creation failed (for any reason)
-        except ResourceFailure as e:
-            print(
-                f"Fio client VM '{vm.name}' creation failed with '{e.message}'"
-                " error.")
-            conn.delete_server(vm)
-            sys.exit(0)
-        vms.append(vm)
-
-        # Create a volume of the given type
-        vol_name = f"{VOL_NAME_MASK}{i}"
-        vol = volume.create_volume(
-            name=vol_name, size=VOL_SIZE, volume_type=VOL_TYPE)
-        try:
-            vol = volume.wait_for_status(vol, status='available')
-            print(f"Volume '{vol.name}' is created")
-        # Delete a volume if its creation failed and switch to next
-        # fio client VM
-        except ResourceFailure as e:
-            print(
-                f"Volume '{vol.name}' creation failed with '{e.message}' "
-                "error.")
-            conn.delete_volume(vol)
-            continue
-
-        # Attach the volume to the fio client
-        compute.create_volume_attachment(vm, volume=vol)
-        try:
-            vol = volume.wait_for_status(vol, status='in-use')
-            print(f"Volume '{vol.name}' is attached to '{vm.name}' fio client")
-        # Delete a volume if attachment failed and switch to next
-        # fio client VM
-        except ResourceFailure as e:
-            print(
-                f"Volume '{vol.name}' attachment failed with '{e.message}' "
-                "error.")
-            conn.delete_volume(vol)
-            continue
+    # Create fio client VMs in parallel in batches of CONCURRENCY size
+    with mp.Pool(processes=CONCURRENCY) as pool:
+        results = [
+            pool.apply_async(create_fio_client, kwds=vm_kwargs)
+            for _ in range(CLIENTS_COUNT)]
+        # Wait for batch of fio client VMs to be created
+        _ = [r.get() for r in results]