Merge "Add concurrency tests for Cinder operations"
diff --git a/.zuul.yaml b/.zuul.yaml
index c7cc678..c544d2a 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -5,6 +5,7 @@
     check:
       jobs:
         - cinder-tempest-plugin-lvm-multiattach
+        - cinder-tempest-plugin-lvm-concurrency-tests
         - cinder-tempest-plugin-lvm-lio-barbican
         - cinder-tempest-plugin-lvm-lio-barbican-centos-9-stream:
             voting: false
@@ -92,6 +93,32 @@
     timeout: 10800
 
 - job:
+    name: cinder-tempest-plugin-lvm-concurrency-tests
+    description: |
+      This job runs Cinder concurrency scenario tests from the cinder-tempest-plugin.
+      These tests involve parallel operations on volumes (e.g., backup creation, attachment),
+      which can put stress on system resources.
+
+      To avoid hitting resource limits, `tempest_concurrency` is set to 1, ensuring that
+      the tests themselves run in serial even though each test performs concurrent actions internally.
+    parent: devstack-tempest
+    required-projects:
+      - opendev.org/openstack/tempest
+      - opendev.org/openstack/cinder-tempest-plugin
+      - opendev.org/openstack/cinder
+    vars:
+      tempest_concurrency: 1
+      tox_envlist: all
+      tempest_test_regex: 'cinder_tempest_plugin.scenario.test_volume_concurrency'
+      tempest_plugins:
+        - cinder-tempest-plugin
+      devstack_local_conf:
+        test-config:
+          $TEMPEST_CONFIG:
+            volume-feature-enabled:
+              concurrency_tests: True
+
+- job:
     name: cinder-tempest-plugin-lvm-barbican-base-abstract
     description: |
       This is a base job for lvm with lio & tgt targets
diff --git a/cinder_tempest_plugin/common/__init__.py b/cinder_tempest_plugin/common/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cinder_tempest_plugin/common/__init__.py
diff --git a/cinder_tempest_plugin/common/concurrency.py b/cinder_tempest_plugin/common/concurrency.py
new file mode 100644
index 0000000..0374b12
--- /dev/null
+++ b/cinder_tempest_plugin/common/concurrency.py
@@ -0,0 +1,55 @@
+# Copyright 2025 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+import multiprocessing
+
+from tempest import config
+
+CONF = config.CONF
+
+
+def run_concurrent_tasks(target, **kwargs):
+    """Run a target function concurrently using multiprocessing."""
+    manager = multiprocessing.Manager()
+    resource_ids = manager.list()
+    # To capture exceptions
+    errors = manager.list()
+    resource_count = CONF.volume.concurrent_resource_count
+
+    def wrapped_target(index, resource_ids, **kwargs):
+        try:
+            target(index, resource_ids, **kwargs)
+        except Exception as e:
+            errors.append(f"Worker {index} failed: {str(e)}")
+
+    processes = []
+    for i in range(resource_count):
+        p = multiprocessing.Process(
+            target=wrapped_target,
+            args=(i, resource_ids),
+            kwargs=kwargs
+        )
+        processes.append(p)
+        p.start()
+
+    for p in processes:
+        p.join()
+
+    if errors:
+        error_msg = "\n".join(errors)
+        raise RuntimeError(
+            f"One or more concurrent tasks failed:\n{error_msg}")
+
+    return list(resource_ids)
diff --git a/cinder_tempest_plugin/config.py b/cinder_tempest_plugin/config.py
index 3d5eb0e..969451e 100644
--- a/cinder_tempest_plugin/config.py
+++ b/cinder_tempest_plugin/config.py
@@ -30,7 +30,10 @@
                 '`volume_image_dep_tests` '
                 'in cinder-tempest-plugin is deprecated.Alternatively '
                 '`CONF.volume_feature_enabled.enable_volume_image_dep_tests` '
-                'can be used for dependency tests.')
+                'can be used for dependency tests.'),
+    cfg.BoolOpt('concurrency_tests',
+                default=False,
+                help='Enable or disable running concurrency tests.'),
 ]
 
 # The barbican service is discovered by config_tempest [1], and will appear
@@ -44,3 +47,9 @@
                 default=False,
                 help="Whether or not barbican is expected to be available"),
 ]
+
+concurrency_option = [
+    cfg.IntOpt('concurrent_resource_count',
+               default=5,
+               help='Number of resources to create concurrently.'),
+]
diff --git a/cinder_tempest_plugin/plugin.py b/cinder_tempest_plugin/plugin.py
index 79c835c..e9583cd 100644
--- a/cinder_tempest_plugin/plugin.py
+++ b/cinder_tempest_plugin/plugin.py
@@ -47,6 +47,9 @@
         config.register_opt_group(conf, config.volume_feature_group,
                                   project_config.cinder_option)
 
+        config.register_opt_group(conf, config.volume_group,
+                                  project_config.concurrency_option)
+
         # Define the 'barbican' service_available option, but only if the
         # barbican_tempest_plugin isn't present. It also defines the option,
         # and we need to avoid a duplicate option registration.
@@ -62,6 +65,7 @@
         """
         opt_lists = [
             (config.volume_feature_group.name, project_config.cinder_option),
+            (config.volume_group.name, project_config.concurrency_option),
         ]
 
         if 'barbican_tempest_plugin' not in sys.modules:
diff --git a/cinder_tempest_plugin/scenario/test_volume_concurrency.py b/cinder_tempest_plugin/scenario/test_volume_concurrency.py
new file mode 100644
index 0000000..3a27174
--- /dev/null
+++ b/cinder_tempest_plugin/scenario/test_volume_concurrency.py
@@ -0,0 +1,170 @@
+# Copyright 2025 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+from tempest.common import utils
+from tempest.common import waiters
+from tempest import config
+from tempest.lib import decorators
+
+from cinder_tempest_plugin.common import concurrency
+from cinder_tempest_plugin.scenario import manager
+
+CONF = config.CONF
+
+
+class ConcurrentVolumeActionsTest(manager.ScenarioTest):
+
+    @classmethod
+    def skip_checks(cls):
+        super(ConcurrentVolumeActionsTest, cls).skip_checks()
+        if not CONF.volume_feature_enabled.concurrency_tests:
+            raise cls.skipException(
+                "Concurrency tests are disabled.")
+
+    def _resource_create(self, index, resource_ids, create_func,
+                         resource_id_key='id', **kwargs):
+        """Generic resource creation logic.
+
+        Handles both single and indexed resource creation.
+        If any list-type arguments are passed (e.g., volume_ids),
+        they are indexed using `index`.
+        """
+
+        # Prepare arguments, indexing into lists if necessary
+        adjusted_kwargs = {}
+        for key, value in kwargs.items():
+            if isinstance(value, list):
+                # For list arguments, pick the value by index
+                adjusted_kwargs[key] = value[index]
+            else:
+                adjusted_kwargs[key] = value
+
+        resource = create_func(**adjusted_kwargs)
+        resource_ids.append(resource[resource_id_key])
+
+    def _attach_volume_action(self, index, resource_ids, server_id,
+                              volume_ids):
+        """Attach the given volume to the server."""
+        volume_id = volume_ids[index]
+        self.servers_client.attach_volume(
+            server_id, volumeId=volume_id, device=None)
+        waiters.wait_for_volume_resource_status(
+            self.volumes_client, volume_id, 'in-use')
+        resource_ids.append((server_id, volume_id))
+
+    def _cleanup_resources(self, resource_ids, delete_func, wait_func):
+        """Delete and wait for resource cleanup."""
+        for res_id in resource_ids:
+            delete_func(res_id)
+            wait_func(res_id)
+
+    @utils.services('volume')
+    @decorators.idempotent_id('ceb4f3c2-b2a4-48f9-82a8-3d32cdb5b375')
+    def test_create_volumes(self):
+        """Test parallel volume creation."""
+        volume_ids = concurrency.run_concurrent_tasks(
+            self._resource_create,
+            create_func=self.create_volume,
+        )
+
+        self._cleanup_resources(volume_ids,
+                                self.volumes_client.delete_volume,
+                                self.volumes_client.wait_for_resource_deletion)
+
+    @utils.services('volume')
+    @decorators.idempotent_id('6aa893a6-dfd0-4a0b-ae15-2fb24342e48d')
+    def test_create_snapshots(self):
+        """Test parallel snapshot creation from a single volume."""
+        volume = self.create_volume()
+
+        snapshot_ids = concurrency.run_concurrent_tasks(
+            self._resource_create,
+            create_func=self.create_volume_snapshot,
+            volume_id=volume['id']
+        )
+
+        self._cleanup_resources(
+            snapshot_ids,
+            self.snapshots_client.delete_snapshot,
+            self.snapshots_client.wait_for_resource_deletion)
+
+    @utils.services('compute', 'volume')
+    @decorators.idempotent_id('4c038386-00b0-4a6d-a612-48a4e0a96fa6')
+    def test_attach_volumes_to_server(self):
+        """Test parallel volume attachment to a server."""
+        server = self.create_server(wait_until='ACTIVE')
+        server_id = server['id']
+
+        volume_ids = concurrency.run_concurrent_tasks(
+            self._resource_create,
+            create_func=self.create_volume
+        )
+
+        attach_ids = concurrency.run_concurrent_tasks(
+            self._attach_volume_action,
+            server_id=server_id,
+            volume_ids=volume_ids
+        )
+
+        for server_id, volume_id in attach_ids:
+            self.servers_client.detach_volume(server_id, volume_id)
+            waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                    volume_id, 'available')
+
+        self._cleanup_resources(volume_ids,
+                                self.volumes_client.delete_volume,
+                                self.volumes_client.wait_for_resource_deletion)
+
+    @utils.services('volume')
+    @decorators.idempotent_id('01f66de8-b217-4588-ab7f-e707d1931156')
+    def test_create_backups_and_restores(self):
+        """Test parallel backup creation and restore from multiple volumes."""
+
+        # Step 1: Create volumes in concurrency
+        volume_ids = concurrency.run_concurrent_tasks(
+            self._resource_create,
+            create_func=self.create_volume
+        )
+
+        # Step 2: Create backups in concurrency
+        backup_ids = concurrency.run_concurrent_tasks(
+            self._resource_create,
+            create_func=self.create_backup,
+            volume_id=volume_ids
+        )
+
+        # Step 3: Restore backups in concurrency
+        restored_vol_ids = concurrency.run_concurrent_tasks(
+            self._resource_create,
+            create_func=self.restore_backup,
+            resource_id_key='volume_id',
+            backup_id=backup_ids
+        )
+
+        # Step 4: Cleanup all resources
+        self._cleanup_resources(
+            backup_ids,
+            self.backups_client.delete_backup,
+            self.backups_client.wait_for_resource_deletion)
+
+        self._cleanup_resources(
+            volume_ids,
+            self.volumes_client.delete_volume,
+            self.volumes_client.wait_for_resource_deletion)
+
+        self._cleanup_resources(
+            restored_vol_ids,
+            self.volumes_client.delete_volume,
+            self.volumes_client.wait_for_resource_deletion)