Add global statistic for stress tests

Adds shared memory between all stress action processes and the driver in order
to generate a global statistic. A new command line option is introduced
with this it is possbile to terminate an action if the
specifited number of execution is reached.

Implements: blueprint stress-tests

Conflicts:
tempest/stress/actions/create_destroy_server.py
tempest/stress/actions/volume_create_delete.py
tempest/stress/driver.py
tempest/stress/run_stress.py

Change-Id: I34765932ba93cd6d7f0df23ab97d9483eb459978
diff --git a/tempest/stress/driver.py b/tempest/stress/driver.py
index 785da7d..b04a93a 100644
--- a/tempest/stress/driver.py
+++ b/tempest/stress/driver.py
@@ -98,7 +98,7 @@
     return getattr(importlib.import_module(module_part), obj_name)
 
 
-def stress_openstack(tests, duration):
+def stress_openstack(tests, duration, max_runs=None):
     """
     Workload driver. Executes an action function against a nova-cluster.
 
@@ -116,7 +116,7 @@
             manager = admin_manager
         else:
             manager = clients.Manager()
-        for _ in xrange(test.get('threads', 1)):
+        for p_number in xrange(test.get('threads', 1)):
             if test.get('use_isolated_tenants', False):
                 username = rand_name("stress_user")
                 tenant_name = rand_name("stress_tenant")
@@ -132,24 +132,46 @@
                                           tenant_name=tenant_name)
 
             test_obj = get_action_object(test['action'])
-            test_run = test_obj(manager, logger)
+            test_run = test_obj(manager, logger, max_runs)
 
             kwargs = test.get('kwargs', {})
             test_run.setUp(**dict(kwargs.iteritems()))
 
             logger.debug("calling Target Object %s" %
                          test_run.__class__.__name__)
-            p = multiprocessing.Process(target=test_run.execute,
-                                        args=())
 
-            processes.append(p)
+            mp_manager = multiprocessing.Manager()
+            shared_statistic = mp_manager.dict()
+            shared_statistic['runs'] = 0
+            shared_statistic['fails'] = 0
+
+            p = multiprocessing.Process(target=test_run.execute,
+                                        args=(shared_statistic,))
+
+            process = {'process': p,
+                       'p_number': p_number,
+                       'action': test['action'],
+                       'statistic': shared_statistic}
+
+            processes.append(process)
             p.start()
     end_time = time.time() + duration
     had_errors = False
     while True:
-        remaining = end_time - time.time()
-        if remaining <= 0:
-            break
+        if max_runs is None:
+            remaining = end_time - time.time()
+            if remaining <= 0:
+                break
+        else:
+            remaining = log_check_interval
+            all_proc_term = True
+            for process in processes:
+                if process['process'].is_alive():
+                    all_proc_term = False
+                    break
+            if all_proc_term:
+                break
+
         time.sleep(min(remaining, log_check_interval))
         if not logfiles:
             continue
@@ -158,9 +180,28 @@
             had_errors = True
             break
 
-    for p in processes:
-        p.terminate()
-        p.join()
+    for process in processes:
+        if process['process'].is_alive():
+            process['process'].terminate()
+        process['process'].join()
+
+    sum_fails = 0
+    sum_runs = 0
+
+    logger.info("Statistics (per process):")
+    for process in processes:
+        if process['statistic']['fails'] > 0:
+            had_errors = True
+        sum_runs += process['statistic']['runs']
+        sum_fails += process['statistic']['fails']
+        logger.info(" Process %d (%s): Run %d actions (%d failed)" %
+                    (process['p_number'],
+                     process['action'],
+                     process['statistic']['runs'],
+                     process['statistic']['fails']))
+    logger.info("Summary:")
+    logger.info("Run %d actions (%d failed)" %
+                (sum_runs, sum_fails))
 
     if not had_errors:
         logger.info("cleaning up")
diff --git a/tempest/stress/run_stress.py b/tempest/stress/run_stress.py
index 109f334..9ec1527 100755
--- a/tempest/stress/run_stress.py
+++ b/tempest/stress/run_stress.py
@@ -26,9 +26,9 @@
     tests = json.load(open(ns.tests, 'r'))
     if ns.serial:
         for test in tests:
-            driver.stress_openstack([test], ns.duration)
+            driver.stress_openstack([test], ns.duration, ns.number)
     else:
-        driver.stress_openstack(tests, ns.duration)
+        driver.stress_openstack(tests, ns.duration, ns.number)
 
 
 parser = argparse.ArgumentParser(description='Run stress tests. ')
@@ -36,5 +36,7 @@
                     help="Duration of test in secs.")
 parser.add_argument('-s', '--serial', action='store_true',
                     help="Trigger running tests serially.")
+parser.add_argument('-n', '--number', type=int,
+                    help="How often an action is executed for each process.")
 parser.add_argument('tests', help="Name of the file with test description.")
 main(parser.parse_args())
diff --git a/tempest/stress/stressaction.py b/tempest/stress/stressaction.py
index f45ef17..77ddd1c 100644
--- a/tempest/stress/stressaction.py
+++ b/tempest/stress/stressaction.py
@@ -20,10 +20,10 @@
 
 class StressAction(object):
 
-    def __init__(self, manager, logger):
+    def __init__(self, manager, logger, max_runs=None):
         self.manager = manager
         self.logger = logger
-        self.runs = 0
+        self.max_runs = max_runs
 
     def _shutdown_handler(self, signal, frame):
         self.tearDown()
@@ -45,7 +45,7 @@
         """
         self.logger.debug("tearDown")
 
-    def execute(self):
+    def execute(self, shared_statistic):
         """This is the main execution entry point called
         by the driver.   We register a signal handler to
         allow us to gracefull tearDown, and then exit.
@@ -53,9 +53,16 @@
         """
         signal.signal(signal.SIGHUP, self._shutdown_handler)
         signal.signal(signal.SIGTERM, self._shutdown_handler)
-        while True:
-            self.run()
-            self.runs = self.runs + 1
+
+        while self.max_runs is None or (shared_statistic['runs'] <
+                                        self.max_runs):
+            try:
+                self.run()
+            except Exception:
+                shared_statistic['fails'] += 1
+                self.logger.exception("Failure in run")
+            finally:
+                shared_statistic['runs'] += 1
 
     def run(self):
         """This method is where the stress test code runs."""