blob: 0374b12e46dd1646eb3e0aa8fac0c12c1c109bfc [file] [log] [blame]
lkuchlanec9aee22025-05-15 15:52:41 +03001# Copyright 2025 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import multiprocessing
17
18from tempest import config
19
20CONF = config.CONF
21
22
23def run_concurrent_tasks(target, **kwargs):
24 """Run a target function concurrently using multiprocessing."""
25 manager = multiprocessing.Manager()
26 resource_ids = manager.list()
27 # To capture exceptions
28 errors = manager.list()
29 resource_count = CONF.volume.concurrent_resource_count
30
31 def wrapped_target(index, resource_ids, **kwargs):
32 try:
33 target(index, resource_ids, **kwargs)
34 except Exception as e:
35 errors.append(f"Worker {index} failed: {str(e)}")
36
37 processes = []
38 for i in range(resource_count):
39 p = multiprocessing.Process(
40 target=wrapped_target,
41 args=(i, resource_ids),
42 kwargs=kwargs
43 )
44 processes.append(p)
45 p.start()
46
47 for p in processes:
48 p.join()
49
50 if errors:
51 error_msg = "\n".join(errors)
52 raise RuntimeError(
53 f"One or more concurrent tasks failed:\n{error_msg}")
54
55 return list(resource_ids)