Fix race in ZaqarEventSinkTest.test_events

Change-Id: I46378492a21c186de89b5af04e2d973432e38bff
Closes-Bug: #1679969
diff --git a/functional/test_event_sinks.py b/functional/test_event_sinks.py
index 5d9f566..a7e7579 100644
--- a/functional/test_event_sinks.py
+++ b/functional/test_event_sinks.py
@@ -14,6 +14,7 @@
 
 from zaqarclient.queues.v1 import client as zaqarclient
 
+from heat_integrationtests.common import test
 from heat_integrationtests.functional import functional_base
 
 
@@ -54,17 +55,25 @@
 
         zaqar = zaqarclient.Client(conf=conf, version=1.1)
         queue = zaqar.queue(queue_id)
-        messages = list(queue.messages())
-        self.assertEqual(4, len(messages))
-        types = [m.body['type'] for m in messages]
-        self.assertEqual(['os.heat.event'] * 4, types)
-        resources = set([m.body['payload']['resource_name'] for m in messages])
-        self.assertEqual(set([stack_name, 'test_resource']), resources)
-        stack_ids = [m.body['payload']['stack_id'] for m in messages]
-        self.assertEqual([stack_id] * 4, stack_ids)
-        statuses = [m.body['payload']['resource_status'] for m in messages]
-        statuses.sort()
-        self.assertEqual(
-            ['COMPLETE', 'COMPLETE', 'IN_PROGRESS', 'IN_PROGRESS'], statuses)
-        actions = [m.body['payload']['resource_action'] for m in messages]
-        self.assertEqual(['CREATE'] * 4, actions)
+
+        def validate_messages():
+            messages = list(queue.messages())
+            if len(messages) < 4:
+                return False
+
+            types = [m.body['type'] for m in messages]
+            self.assertEqual(['os.heat.event'] * 4, types)
+            resources = set([m.body['payload'][
+                'resource_name'] for m in messages])
+            self.assertEqual(set([stack_name, 'test_resource']), resources)
+            stack_ids = [m.body['payload']['stack_id'] for m in messages]
+            self.assertEqual([stack_id] * 4, stack_ids)
+            statuses = [m.body['payload']['resource_status'] for m in messages]
+            statuses.sort()
+            self.assertEqual(['COMPLETE', 'COMPLETE',
+                              'IN_PROGRESS', 'IN_PROGRESS'], statuses)
+            actions = [m.body['payload']['resource_action'] for m in messages]
+            self.assertEqual(['CREATE'] * 4, actions)
+            return True
+
+        self.assertTrue(test.call_until_true(20, 0, validate_messages))