Revert "Add multithreading while installing salt-minions" PROD-37096
This reverts commit b8869ae0517448c33ba5553d8f945276a4152603.
Reason for revert: need more tests
Change-Id: I5ea9d2351bc998299e3da90ba723ed4dc1674f0d
diff --git a/tcp_tests/helpers/utils.py b/tcp_tests/helpers/utils.py
index e0ef723..46fc90d 100644
--- a/tcp_tests/helpers/utils.py
+++ b/tcp_tests/helpers/utils.py
@@ -23,8 +23,6 @@
import jinja2
import paramiko
import yaml
-import logging
-from multiprocessing import Process, BoundedSemaphore
from devops.helpers import ssh_client
from tcp_tests import logger
@@ -487,73 +485,3 @@
class TimeoutException(Exception):
pass
-
-
-pool = list()
-LOG_FORMAT = '%(asctime)s - %(levelname)s %(filename)s:%(lineno)d ' \
- '/%(processName)s/ -- %(message)s'
-
-
-class Worker:
- def __init__(self, limit=4, timeout=None):
- """
- limit of parallel thread to execute
- timeout of waiting threads in seconds
- """
- LOG.debug("Created multithreading Worker limited by {} "
- "threads".format(limit))
- self._sema = BoundedSemaphore(limit)
- self.timeout = timeout
- pass
-
- @property
- def pool(self):
- global pool
- return pool
-
- def _worker(self, func, args):
- try:
- # FIXME: logging doesn't work
- memory_handler = logging.handlers.MemoryHandler(
- 50,
- target=logger.console)
- formatter = logging.Formatter(fmt=LOG_FORMAT)
-
- LOG = logging.getLogger("{}{}".format(func, args))
- LOG.setLevel(logging.DEBUG)
- memory_handler.setFormatter(formatter)
- LOG.addHandler(memory_handler)
- # #######
- func(*args)
- # #######
- memory_handler.close()
- finally:
- # allow a new process to be started now that this one is exiting
- self._sema.release()
-
- def start(self, func, args, name=None):
- self._sema.acquire() # wait to start until another process is finished
- p = Process(target=self._worker,
- args=(func, args),
- name=name
- )
- self.pool.append(p)
- p.start()
-
- def are_completed(self):
- for t in self.pool:
- LOG.info("Joining {}....".format(t))
- t.join(timeout=self.timeout)
- return all([not (task.is_alive()) for task in self.pool])
-
- def clean_pool(self):
- for i in range(self.pool.__len__()):
- del self.pool[0]
-
- def all_tasks_successfully_completed(self):
- return all([task.exitcode == 0 for task in self.pool])
-
- def print_failed_tasks(self):
- return "\n".join([str(task)
- for task in self.pool
- if task.exitcode != 0])
diff --git a/tcp_tests/managers/execute_commands.py b/tcp_tests/managers/execute_commands.py
index 314c641..6dcf615 100644
--- a/tcp_tests/managers/execute_commands.py
+++ b/tcp_tests/managers/execute_commands.py
@@ -1,9 +1,8 @@
import time
-from tcp_tests import logger, settings
+from tcp_tests import logger
from tcp_tests.helpers.log_helpers import pretty_repr
-from tcp_tests.helpers.utils import Worker
LOG = logger.logger
@@ -37,8 +36,6 @@
'node_name': 'name of the node to run the command(s)',
# Optional:
'description': 'string with a readable command description',
- 'parallel': 'bool (True of False) to enable executing these
- type of command in multithreading'
'retry': {
'count': int, # How many times should be run the command
# until success
@@ -52,7 +49,6 @@
...
]
"""
- worker = Worker(limit=settings.THREADS, timeout=3*60)
for n, step in enumerate(commands):
# Required fields
action_cmd = step.get('cmd')
@@ -71,19 +67,7 @@
log_msg = "\n\n{0}\n{1}".format(msg, '=' * len(msg))
if action_cmd:
- if step.get('parallel'):
- name = description + " on " + step.get("node_name")
- worker.start(func=self.execute_command,
- args=(step, msg),
- name=name
- )
- else:
- while not worker.are_completed():
- LOG.info("Waiting {}".format(worker.pool))
- if worker.all_tasks_successfully_completed():
- worker.clean_pool()
- self.execute_command(step, msg)
-
+ self.execute_command(step, msg)
elif action_do:
self.command2(step, msg)
elif action_upload:
@@ -93,12 +77,6 @@
LOG.info(log_msg)
self.action_download(step)
- while not worker.are_completed():
- LOG.info("Waiting {}".format(worker.pool))
-
- assert worker.all_tasks_successfully_completed(), \
- worker.print_failed_tasks()
-
def execute_command(self, step, msg, return_res=None):
# Required fields
cmd = step.get('cmd')
@@ -112,6 +90,7 @@
timeout = step.get('timeout', None)
with self.__underlay.remote(node_name=node_name) as remote:
+
for x in range(retry_count, 0, -1):
time.sleep(3)
diff --git a/tcp_tests/settings.py b/tcp_tests/settings.py
index f4ff7e0..18548fb 100644
--- a/tcp_tests/settings.py
+++ b/tcp_tests/settings.py
@@ -72,7 +72,6 @@
DOCKER_NAME = os.environ.get('DOCKER_NAME',
'mirantis/oscore/rally-tempest:latest')
DOCKER_IMAGES_SL_TAG = os.environ.get('DOCKER_IMAGES_SL_TAG', 'latest')
-THREADS = os.environ.get("THREADS", 10)
PATTERN = os.environ.get('PATTERN', None)
RUN_TEMPEST = get_var_as_bool('RUN_TEMPEST', False)
diff --git a/tcp_tests/templates/shared-salt.yaml b/tcp_tests/templates/shared-salt.yaml
index cc38df3..1f43386 100644
--- a/tcp_tests/templates/shared-salt.yaml
+++ b/tcp_tests/templates/shared-salt.yaml
@@ -808,7 +808,6 @@
{%- if salt_roles %}
- description: Configure salt-minion on {{ ssh['node_name'] }}
- parallel: True
cmd: |
set -ex;
[ ! -d /etc/salt/minion.d ] && mkdir -p /etc/salt/minion.d;