Add multithreading while installing salt-minions
PROD-37096
Change-Id: Ia6126fb0cb3cc9a286ecf802516b49550175b759
diff --git a/tcp_tests/helpers/utils.py b/tcp_tests/helpers/utils.py
index 46fc90d..e0ef723 100644
--- a/tcp_tests/helpers/utils.py
+++ b/tcp_tests/helpers/utils.py
@@ -23,6 +23,8 @@
import jinja2
import paramiko
import yaml
+import logging
+from multiprocessing import Process, BoundedSemaphore
from devops.helpers import ssh_client
from tcp_tests import logger
@@ -485,3 +487,73 @@
class TimeoutException(Exception):
pass
+
+
+pool = list()
+LOG_FORMAT = '%(asctime)s - %(levelname)s %(filename)s:%(lineno)d ' \
+ '/%(processName)s/ -- %(message)s'
+
+
+class Worker:
+ def __init__(self, limit=4, timeout=None):
+ """
+ limit of parallel thread to execute
+ timeout of waiting threads in seconds
+ """
+ LOG.debug("Created multithreading Worker limited by {} "
+ "threads".format(limit))
+ self._sema = BoundedSemaphore(limit)
+ self.timeout = timeout
+ pass
+
+ @property
+ def pool(self):
+ global pool
+ return pool
+
+ def _worker(self, func, args):
+ try:
+ # FIXME: logging doesn't work
+ memory_handler = logging.handlers.MemoryHandler(
+ 50,
+ target=logger.console)
+ formatter = logging.Formatter(fmt=LOG_FORMAT)
+
+ LOG = logging.getLogger("{}{}".format(func, args))
+ LOG.setLevel(logging.DEBUG)
+ memory_handler.setFormatter(formatter)
+ LOG.addHandler(memory_handler)
+ # #######
+ func(*args)
+ # #######
+ memory_handler.close()
+ finally:
+ # allow a new process to be started now that this one is exiting
+ self._sema.release()
+
+ def start(self, func, args, name=None):
+ self._sema.acquire() # wait to start until another process is finished
+ p = Process(target=self._worker,
+ args=(func, args),
+ name=name
+ )
+ self.pool.append(p)
+ p.start()
+
+ def are_completed(self):
+ for t in self.pool:
+ LOG.info("Joining {}....".format(t))
+ t.join(timeout=self.timeout)
+ return all([not (task.is_alive()) for task in self.pool])
+
+ def clean_pool(self):
+ for i in range(self.pool.__len__()):
+ del self.pool[0]
+
+ def all_tasks_successfully_completed(self):
+ return all([task.exitcode == 0 for task in self.pool])
+
+ def print_failed_tasks(self):
+ return "\n".join([str(task)
+ for task in self.pool
+ if task.exitcode != 0])