Add multithreading while installing salt-minions
PROD-37096

Change-Id: Ia6126fb0cb3cc9a286ecf802516b49550175b759
diff --git a/tcp_tests/helpers/utils.py b/tcp_tests/helpers/utils.py
index 46fc90d..e0ef723 100644
--- a/tcp_tests/helpers/utils.py
+++ b/tcp_tests/helpers/utils.py
@@ -23,6 +23,8 @@
 import jinja2
 import paramiko
 import yaml
+import logging
+from multiprocessing import Process, BoundedSemaphore
 from devops.helpers import ssh_client
 
 from tcp_tests import logger
@@ -485,3 +487,73 @@
 
 class TimeoutException(Exception):
     pass
+
+
+pool = list()
+LOG_FORMAT = '%(asctime)s - %(levelname)s %(filename)s:%(lineno)d ' \
+             '/%(processName)s/ -- %(message)s'
+
+
+class Worker:
+    def __init__(self, limit=4, timeout=None):
+        """
+        limit of parallel thread to execute
+        timeout of waiting threads in seconds
+        """
+        LOG.debug("Created multithreading Worker limited by {} "
+                  "threads".format(limit))
+        self._sema = BoundedSemaphore(limit)
+        self.timeout = timeout
+        pass
+
+    @property
+    def pool(self):
+        global pool
+        return pool
+
+    def _worker(self, func, args):
+        try:
+            # FIXME: logging doesn't work
+            memory_handler = logging.handlers.MemoryHandler(
+                50,
+                target=logger.console)
+            formatter = logging.Formatter(fmt=LOG_FORMAT)
+
+            LOG = logging.getLogger("{}{}".format(func, args))
+            LOG.setLevel(logging.DEBUG)
+            memory_handler.setFormatter(formatter)
+            LOG.addHandler(memory_handler)
+            # #######
+            func(*args)
+            # #######
+            memory_handler.close()
+        finally:
+            # allow a new process to be started now that this one is exiting
+            self._sema.release()
+
+    def start(self, func, args, name=None):
+        self._sema.acquire()  # wait to start until another process is finished
+        p = Process(target=self._worker,
+                    args=(func, args),
+                    name=name
+                    )
+        self.pool.append(p)
+        p.start()
+
+    def are_completed(self):
+        for t in self.pool:
+            LOG.info("Joining {}....".format(t))
+            t.join(timeout=self.timeout)
+        return all([not (task.is_alive()) for task in self.pool])
+
+    def clean_pool(self):
+        for i in range(self.pool.__len__()):
+            del self.pool[0]
+
+    def all_tasks_successfully_completed(self):
+        return all([task.exitcode == 0 for task in self.pool])
+
+    def print_failed_tasks(self):
+        return "\n".join([str(task)
+                          for task in self.pool
+                          if task.exitcode != 0])