Move test_notifications to functional

Change-Id: I8fa39fa0ba65edeb2b4c7d0b22eb8206f47d3083
diff --git a/common/test.py b/common/test.py
index 2392b8e..8ffbea3 100644
--- a/common/test.py
+++ b/common/test.py
@@ -346,3 +346,13 @@
             self.skipTest('Testing Stack abandon disabled in conf, skipping')
         info = self.client.stacks.abandon(stack_id=stack_id)
         return info
+
+    def stack_suspend(self, stack_identifier):
+        stack_name = stack_identifier.split('/')[0]
+        self.client.actions.suspend(stack_name)
+        self._wait_for_stack_status(stack_identifier, 'SUSPEND_COMPLETE')
+
+    def stack_resume(self, stack_identifier):
+        stack_name = stack_identifier.split('/')[0]
+        self.client.actions.resume(stack_name)
+        self._wait_for_stack_status(stack_identifier, 'RESUME_COMPLETE')
diff --git a/functional/test_notifications.py b/functional/test_notifications.py
new file mode 100644
index 0000000..a9b6cf1
--- /dev/null
+++ b/functional/test_notifications.py
@@ -0,0 +1,202 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import kombu
+from oslo_config import cfg
+from oslo_messaging._drivers import common
+from oslo_messaging import transport
+import requests
+
+from heat_integrationtests.common import test
+
+
+BASIC_NOTIFICATIONS = [
+    'orchestration.stack.create.start',
+    'orchestration.stack.create.end',
+    'orchestration.stack.update.start',
+    'orchestration.stack.update.end',
+    'orchestration.stack.suspend.start',
+    'orchestration.stack.suspend.end',
+    'orchestration.stack.resume.start',
+    'orchestration.stack.resume.end',
+    'orchestration.stack.delete.start',
+    'orchestration.stack.delete.end'
+]
+
+ASG_NOTIFICATIONS = [
+    'orchestration.autoscaling.start',
+    'orchestration.autoscaling.end'
+]
+
+
+def get_url(conf):
+    conf = conf.oslo_messaging_rabbit
+    return 'amqp://%s:%s@%s:%s/' % (conf.rabbit_userid,
+                                    conf.rabbit_password,
+                                    conf.rabbit_host,
+                                    conf.rabbit_port)
+
+
+class NotificationHandler(object):
+    def __init__(self, stack_id, events=None):
+        self._notifications = []
+        self.stack_id = stack_id
+        self.events = events
+
+    def process_message(self, body, message):
+        notification = common.deserialize_msg(body)
+        if notification['payload']['stack_name'] == self.stack_id:
+            if self.events is not None:
+                if notification['event_type'] in self.events:
+                    self.notifications.append(notification['event_type'])
+            else:
+                self.notifications.append(notification['event_type'])
+        message.ack()
+
+    def clear(self):
+        self._notifications = []
+
+    @property
+    def notifications(self):
+        return self._notifications
+
+
+class NotificationTest(test.HeatIntegrationTest):
+
+    basic_template = '''
+heat_template_version: 2013-05-23
+resources:
+  random1:
+    type: OS::Heat::RandomString
+'''
+    update_basic_template = '''
+heat_template_version: 2013-05-23
+resources:
+  random1:
+    type: OS::Heat::RandomString
+  random2:
+    type: OS::Heat::RandomString
+'''
+
+    asg_template = '''
+heat_template_version: 2013-05-23
+resources:
+  asg:
+    type: OS::Heat::AutoScalingGroup
+    properties:
+      resource:
+        type: OS::Heat::RandomString
+      min_size: 1
+      desired_capacity: 2
+      max_size: 3
+
+  scale_up_policy:
+    type: OS::Heat::ScalingPolicy
+    properties:
+      adjustment_type: change_in_capacity
+      auto_scaling_group_id: {get_resource: asg}
+      cooldown: 0
+      scaling_adjustment: 1
+
+  scale_down_policy:
+    type: OS::Heat::ScalingPolicy
+    properties:
+      adjustment_type: change_in_capacity
+      auto_scaling_group_id: {get_resource: asg}
+      cooldown: 0
+      scaling_adjustment: '-1'
+
+outputs:
+  scale_up_url:
+    value: {get_attr: [scale_up_policy, alarm_url]}
+  scale_dn_url:
+    value: {get_attr: [scale_down_policy, alarm_url]}
+'''
+
+    def setUp(self):
+        super(NotificationTest, self).setUp()
+
+        self.client = self.orchestration_client
+        self.exchange = kombu.Exchange('heat', 'topic', durable=False)
+        queue = kombu.Queue(exchange=self.exchange,
+                            routing_key='notifications.info',
+                            exclusive=True)
+        self.conn = kombu.Connection(get_url(
+            transport.get_transport(cfg.CONF).conf))
+        self.ch = self.conn.channel()
+        self.queue = queue(self.ch)
+        self.queue.declare()
+
+    def consume_events(self, handler, count):
+        self.conn.drain_events()
+        return len(handler.notifications) == count
+
+    def test_basic_notifications(self):
+        stack_identifier = self._stack_rand_name()
+        # do this manually so we can call _stack_delete() directly.
+        self.client.stacks.create(
+            stack_name=stack_identifier,
+            template=self.basic_template,
+            files={},
+            disable_rollback=True,
+            parameters={},
+            environment={}
+        )
+        self._wait_for_stack_status(stack_identifier, 'CREATE_COMPLETE')
+        self.update_stack(stack_identifier,
+                          template=self.update_basic_template)
+        self.stack_suspend(stack_identifier)
+        self.stack_resume(stack_identifier)
+        self._stack_delete(stack_identifier)
+
+        handler = NotificationHandler(stack_identifier.split('/')[0])
+
+        with self.conn.Consumer(self.queue,
+                                callbacks=[handler.process_message],
+                                auto_declare=False):
+            try:
+                while True:
+                    self.conn.drain_events(timeout=1)
+            except Exception:
+                pass
+
+        for n in BASIC_NOTIFICATIONS:
+            self.assertIn(n, handler.notifications)
+
+    def test_asg_notifications(self):
+        stack_identifier = self.stack_create(template=self.asg_template)
+
+        for output in self.client.stacks.get(stack_identifier).outputs:
+            if output['output_key'] == 'scale_dn_url':
+                scale_down_url = output['output_value']
+            else:
+                scale_up_url = output['output_value']
+
+        notifications = []
+        handler = NotificationHandler(stack_identifier.split('/')[0],
+                                      ASG_NOTIFICATIONS)
+
+        with self.conn.Consumer(self.queue,
+                                callbacks=[handler.process_message],
+                                auto_declare=False):
+
+            requests.post(scale_up_url)
+            test.call_until_true(20, 0, self.consume_events, handler, 2)
+            notifications += handler.notifications
+
+            handler.clear()
+            requests.post(scale_down_url)
+            test.call_until_true(20, 0, self.consume_events, handler, 2)
+            notifications += handler.notifications
+
+        self.assertEqual(2, notifications.count(ASG_NOTIFICATIONS[0]))
+        self.assertEqual(2, notifications.count(ASG_NOTIFICATIONS[1]))