blob: 3b8e0030f96e2f36c16b9b01261af9c2edc2e62f [file] [log] [blame]
Oleksii Chuprykovd9cd9dc2015-02-03 10:34:55 -05001# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12
13import kombu
14from oslo_config import cfg
15from oslo_messaging._drivers import common
16from oslo_messaging import transport
17import requests
18
19from heat_integrationtests.common import test
Rabi Mishra477efc92015-07-31 13:01:45 +053020from heat_integrationtests.functional import functional_base
Oleksii Chuprykovd9cd9dc2015-02-03 10:34:55 -050021
22BASIC_NOTIFICATIONS = [
23 'orchestration.stack.create.start',
24 'orchestration.stack.create.end',
25 'orchestration.stack.update.start',
26 'orchestration.stack.update.end',
27 'orchestration.stack.suspend.start',
28 'orchestration.stack.suspend.end',
29 'orchestration.stack.resume.start',
30 'orchestration.stack.resume.end',
31 'orchestration.stack.delete.start',
32 'orchestration.stack.delete.end'
33]
34
35ASG_NOTIFICATIONS = [
36 'orchestration.autoscaling.start',
37 'orchestration.autoscaling.end'
38]
39
40
41def get_url(conf):
42 conf = conf.oslo_messaging_rabbit
43 return 'amqp://%s:%s@%s:%s/' % (conf.rabbit_userid,
44 conf.rabbit_password,
45 conf.rabbit_host,
46 conf.rabbit_port)
47
48
49class NotificationHandler(object):
50 def __init__(self, stack_id, events=None):
51 self._notifications = []
52 self.stack_id = stack_id
53 self.events = events
54
55 def process_message(self, body, message):
56 notification = common.deserialize_msg(body)
57 if notification['payload']['stack_name'] == self.stack_id:
58 if self.events is not None:
59 if notification['event_type'] in self.events:
60 self.notifications.append(notification['event_type'])
61 else:
62 self.notifications.append(notification['event_type'])
63 message.ack()
64
65 def clear(self):
66 self._notifications = []
67
68 @property
69 def notifications(self):
70 return self._notifications
71
72
Rabi Mishra477efc92015-07-31 13:01:45 +053073class NotificationTest(functional_base.FunctionalTestsBase):
Oleksii Chuprykovd9cd9dc2015-02-03 10:34:55 -050074
75 basic_template = '''
76heat_template_version: 2013-05-23
77resources:
78 random1:
79 type: OS::Heat::RandomString
80'''
81 update_basic_template = '''
82heat_template_version: 2013-05-23
83resources:
84 random1:
85 type: OS::Heat::RandomString
86 random2:
87 type: OS::Heat::RandomString
88'''
89
90 asg_template = '''
91heat_template_version: 2013-05-23
92resources:
93 asg:
94 type: OS::Heat::AutoScalingGroup
95 properties:
96 resource:
97 type: OS::Heat::RandomString
98 min_size: 1
99 desired_capacity: 2
100 max_size: 3
101
102 scale_up_policy:
103 type: OS::Heat::ScalingPolicy
104 properties:
105 adjustment_type: change_in_capacity
106 auto_scaling_group_id: {get_resource: asg}
107 cooldown: 0
108 scaling_adjustment: 1
109
110 scale_down_policy:
111 type: OS::Heat::ScalingPolicy
112 properties:
113 adjustment_type: change_in_capacity
114 auto_scaling_group_id: {get_resource: asg}
115 cooldown: 0
116 scaling_adjustment: '-1'
117
118outputs:
119 scale_up_url:
120 value: {get_attr: [scale_up_policy, alarm_url]}
121 scale_dn_url:
122 value: {get_attr: [scale_down_policy, alarm_url]}
123'''
124
125 def setUp(self):
126 super(NotificationTest, self).setUp()
Oleksii Chuprykovd9cd9dc2015-02-03 10:34:55 -0500127 self.exchange = kombu.Exchange('heat', 'topic', durable=False)
128 queue = kombu.Queue(exchange=self.exchange,
129 routing_key='notifications.info',
130 exclusive=True)
131 self.conn = kombu.Connection(get_url(
132 transport.get_transport(cfg.CONF).conf))
133 self.ch = self.conn.channel()
134 self.queue = queue(self.ch)
135 self.queue.declare()
136
137 def consume_events(self, handler, count):
138 self.conn.drain_events()
139 return len(handler.notifications) == count
140
141 def test_basic_notifications(self):
Sergey Kraynevbf67ce32015-04-17 10:54:20 -0400142 # disable cleanup so we can call _stack_delete() directly.
143 stack_identifier = self.stack_create(template=self.basic_template,
144 enable_cleanup=False)
Oleksii Chuprykovd9cd9dc2015-02-03 10:34:55 -0500145 self.update_stack(stack_identifier,
146 template=self.update_basic_template)
147 self.stack_suspend(stack_identifier)
148 self.stack_resume(stack_identifier)
149 self._stack_delete(stack_identifier)
150
151 handler = NotificationHandler(stack_identifier.split('/')[0])
152
153 with self.conn.Consumer(self.queue,
154 callbacks=[handler.process_message],
155 auto_declare=False):
156 try:
157 while True:
158 self.conn.drain_events(timeout=1)
159 except Exception:
160 pass
161
162 for n in BASIC_NOTIFICATIONS:
163 self.assertIn(n, handler.notifications)
164
165 def test_asg_notifications(self):
166 stack_identifier = self.stack_create(template=self.asg_template)
167
168 for output in self.client.stacks.get(stack_identifier).outputs:
169 if output['output_key'] == 'scale_dn_url':
170 scale_down_url = output['output_value']
171 else:
172 scale_up_url = output['output_value']
173
174 notifications = []
175 handler = NotificationHandler(stack_identifier.split('/')[0],
176 ASG_NOTIFICATIONS)
177
178 with self.conn.Consumer(self.queue,
179 callbacks=[handler.process_message],
180 auto_declare=False):
181
182 requests.post(scale_up_url)
183 test.call_until_true(20, 0, self.consume_events, handler, 2)
184 notifications += handler.notifications
185
186 handler.clear()
187 requests.post(scale_down_url)
188 test.call_until_true(20, 0, self.consume_events, handler, 2)
189 notifications += handler.notifications
190
191 self.assertEqual(2, notifications.count(ASG_NOTIFICATIONS[0]))
192 self.assertEqual(2, notifications.count(ASG_NOTIFICATIONS[1]))