Add functional test for SDG rolling_update
Adds functional test for SoftwareDeploymentGroup
rolling_update.
Change-Id: Id575765d52c84e3c29f443a18417b5f14d9331cc
diff --git a/functional/test_software_deployment_group.py b/functional/test_software_deployment_group.py
index 7817c2b..4e8b868 100644
--- a/functional/test_software_deployment_group.py
+++ b/functional/test_software_deployment_group.py
@@ -43,11 +43,43 @@
'3': dummy3
'''
+ sd_template_with_upd_policy = '''
+heat_template_version: 2016-10-14
+
+parameters:
+ input:
+ type: string
+ default: foo_input
+
+resources:
+ config:
+ type: OS::Heat::SoftwareConfig
+ properties:
+ group: script
+ inputs:
+ - name: foo
+
+ deployment:
+ type: OS::Heat::SoftwareDeploymentGroup
+ update_policy:
+ rolling_update:
+ max_batch_size: 2
+ pause_time: 1
+ properties:
+ config: {get_resource: config}
+ input_values:
+ foo: {get_param: input}
+ servers:
+ '0': dummy0
+ '1': dummy1
+ '2': dummy2
+ '3': dummy3
+'''
enable_cleanup = True
- def test_deployments_crud(self):
+ def deployment_crud(self, template):
stack_identifier = self.stack_create(
- template=self.sd_template,
+ template=template,
enable_cleanup=self.enable_cleanup,
expected_status='CREATE_IN_PROGRESS')
self._wait_for_resource_status(
@@ -58,22 +90,24 @@
stack_identifier, 'deployment', minimal=False)
self.assertEqual(4, len(group_resources))
- self.signal_deployments(group_resources)
- self._wait_for_stack_status(stack_identifier, 'CREATE_COMPLETE')
+ self._wait_for_stack_status(stack_identifier, 'CREATE_COMPLETE',
+ signal_required=True,
+ resources_to_signal=group_resources)
- self.check_input_values(group_resources, 'foo_input')
+ self.check_input_values(group_resources, 'foo', 'foo_input')
self.update_stack(stack_identifier,
- template=self.sd_template,
+ template=template,
environment={'parameters': {'input': 'input2'}},
expected_status='UPDATE_IN_PROGRESS')
nested_identifier = self.assert_resource_is_a_stack(
stack_identifier, 'deployment')
self.assertEqual(4, len(group_resources))
- self.signal_deployments(group_resources)
- self._wait_for_stack_status(stack_identifier, 'UPDATE_COMPLETE')
+ self._wait_for_stack_status(stack_identifier, 'UPDATE_COMPLETE',
+ signal_required=True,
+ resources_to_signal=group_resources)
- self.check_input_values(group_resources, 'input2')
+ self.check_input_values(group_resources, 'foo', 'input2')
# We explicitly test delete here, vs just via cleanup and check
# the nested stack is gone
@@ -82,6 +116,12 @@
nested_identifier, 'DELETE_COMPLETE',
success_on_not_found=True)
+ def test_deployment_crud(self):
+ self.deployment_crud(self.sd_template)
+
+ def test_deployment_crud_with_rolling_update(self):
+ self.deployment_crud(self.sd_template_with_upd_policy)
+
def test_deployments_create_delete_in_progress(self):
stack_identifier = self.stack_create(
template=self.sd_template,
@@ -100,21 +140,3 @@
self._wait_for_stack_status(
nested_identifier, 'DELETE_COMPLETE',
success_on_not_found=True)
-
- def check_input_values(self, group_resources, value):
- # Check inputs for deployment and derived config
- for r in group_resources:
- d = self.client.software_deployments.get(
- r.physical_resource_id)
- self.assertEqual({'foo': value}, d.input_values)
- c = self.client.software_configs.get(
- d.config_id)
- foo_input_c = [i for i in c.inputs if i.get('name') == 'foo'][0]
- self.assertEqual(value, foo_input_c.get('value'))
-
- def signal_deployments(self, resources):
- # Signal all IN_PROGRESS deployment resources
- for r in resources:
- if 'IN_PROGRESS' in r.resource_status:
- stack_id = self.get_resource_stack_id(r)
- self.client.resources.signal(stack_id, r.resource_name)