Adds more testcases to test_telemetry_alarming_api
This submission is to add more testcases in "test_telemetry_alarming_api.py"
test_script. The test cases added are for scenarios "update-alarm",
"set-get-alarm-state", "create_delete_combination_alarm".
Removes alarm create logic from list_alarms method and uses the alarms
in setUp for verification.
Supporting functions "update_alaram", "alarm_get_state", "alarm_get_state"
are added and "put" method is modified in telemetry_client_base.py script.
Change-Id: Ie15cc48ee404db62fb9d74392ed2acc826b852d7
Implements: bp add-basic-ceilometer-tests
diff --git a/tempest/api/telemetry/test_telemetry_alarming_api.py b/tempest/api/telemetry/test_telemetry_alarming_api.py
index a59d3ae..3472b31 100644
--- a/tempest/api/telemetry/test_telemetry_alarming_api.py
+++ b/tempest/api/telemetry/test_telemetry_alarming_api.py
@@ -11,6 +11,7 @@
# under the License.
from tempest.api.telemetry import base
+from tempest.common.utils import data_utils
from tempest import exceptions
from tempest.test import attr
@@ -18,46 +19,96 @@
class TelemetryAlarmingAPITestJSON(base.BaseTelemetryTest):
_interface = 'json'
+ @classmethod
+ def setUpClass(cls):
+ super(TelemetryAlarmingAPITestJSON, cls).setUpClass()
+ cls.rule = {'meter_name': 'cpu_util',
+ 'comparison_operator': 'gt',
+ 'threshold': 80.0,
+ 'period': 70}
+ for i in range(2):
+ cls.create_alarm(threshold_rule=cls.rule)
+
@attr(type="gate")
def test_alarm_list(self):
- # Create an alarm to verify in the list of alarms
- created_alarm_ids = list()
- fetched_ids = list()
- rules = {'meter_name': 'cpu_util',
- 'comparison_operator': 'gt',
- 'threshold': 80.0,
- 'period': 70}
- for i in range(3):
- resp, body = self.create_alarm(threshold_rule=rules)
- created_alarm_ids.append(body['alarm_id'])
-
# List alarms
resp, alarm_list = self.telemetry_client.list_alarms()
- self.assertEqual(int(resp['status']), 200)
+ self.assertEqual(200, resp.status)
# Verify created alarm in the list
fetched_ids = [a['alarm_id'] for a in alarm_list]
- missing_alarms = [a for a in created_alarm_ids if a not in fetched_ids]
+ missing_alarms = [a for a in self.alarm_ids if a not in fetched_ids]
self.assertEqual(0, len(missing_alarms),
"Failed to find the following created alarm(s)"
" in a fetched list: %s" %
', '.join(str(a) for a in missing_alarms))
@attr(type="gate")
- def test_create_alarm(self):
- rules = {'meter_name': 'cpu_util',
- 'comparison_operator': 'gt',
- 'threshold': 80.0,
- 'period': 70}
- resp, body = self.create_alarm(threshold_rule=rules)
- self.alarm_id = body['alarm_id']
- self.assertEqual(int(resp['status']), 201)
- self.assertDictContainsSubset(rules, body['threshold_rule'])
- resp, body = self.telemetry_client.get_alarm(self.alarm_id)
- self.assertEqual(int(resp['status']), 200)
- self.assertDictContainsSubset(rules, body['threshold_rule'])
- resp, _ = self.telemetry_client.delete_alarm(self.alarm_id)
- self.assertEqual(int(resp['status']), 204)
+ def test_create_update_get_delete_alarm(self):
+ # Create an alarm
+ alarm_name = data_utils.rand_name('telemetry_alarm')
+ resp, body = self.telemetry_client.create_alarm(
+ name=alarm_name, type='threshold', threshold_rule=self.rule)
+ self.assertEqual(201, resp.status)
+ self.assertEqual(alarm_name, body['name'])
+ alarm_id = body['alarm_id']
+ self.assertDictContainsSubset(self.rule, body['threshold_rule'])
+ # Update alarm with new rule and new name
+ new_rule = {'meter_name': 'cpu',
+ 'comparison_operator': 'eq',
+ 'threshold': 70.0,
+ 'period': 60}
+ alarm_name = data_utils.rand_name('telemetry-alarm-update')
+ resp, body = self.telemetry_client.update_alarm(
+ alarm_id,
+ threshold_rule=new_rule,
+ name=alarm_name,
+ type='threshold')
+ self.assertEqual(200, resp.status)
+ self.assertEqual(alarm_name, body['name'])
+ self.assertDictContainsSubset(new_rule, body['threshold_rule'])
+ # Get and verify details of an alarm after update
+ resp, body = self.telemetry_client.get_alarm(alarm_id)
+ self.assertEqual(200, resp.status)
+ self.assertEqual(alarm_name, body['name'])
+ self.assertDictContainsSubset(new_rule, body['threshold_rule'])
+ # Delete alarm and verify if deleted
+ resp, _ = self.telemetry_client.delete_alarm(alarm_id)
+ self.assertEqual(204, resp.status)
self.assertRaises(exceptions.NotFound,
- self.telemetry_client.get_alarm,
- self.alarm_id)
+ self.telemetry_client.get_alarm, alarm_id)
+
+ @attr(type="gate")
+ def test_set_get_alarm_state(self):
+ alarm_states = ['ok', 'alarm', 'insufficient data']
+ _, alarm = self.create_alarm(threshold_rule=self.rule)
+ # Set alarm state and verify
+ new_state =\
+ [elem for elem in alarm_states if elem != alarm['state']][0]
+ resp, state = self.telemetry_client.alarm_set_state(alarm['alarm_id'],
+ new_state)
+ self.assertEqual(200, resp.status)
+ self.assertEqual(new_state, state)
+ # Get alarm state and verify
+ resp, state = self.telemetry_client.alarm_get_state(alarm['alarm_id'])
+ self.assertEqual(200, resp.status)
+ self.assertEqual(new_state, state)
+
+ @attr(type="gate")
+ def test_create_delete_alarm_with_combination_rule(self):
+ rule = {"alarm_ids": self.alarm_ids,
+ "operator": "or"}
+ # Verifies alarm create
+ alarm_name = data_utils.rand_name('combination_alarm')
+ resp, body = self.telemetry_client.create_alarm(name=alarm_name,
+ combination_rule=rule,
+ type='combination')
+ self.assertEqual(201, resp.status)
+ self.assertEqual(alarm_name, body['name'])
+ alarm_id = body['alarm_id']
+ self.assertDictContainsSubset(rule, body['combination_rule'])
+ # Verify alarm delete
+ resp, _ = self.telemetry_client.delete_alarm(alarm_id)
+ self.assertEqual(204, resp.status)
+ self.assertRaises(exceptions.NotFound,
+ self.telemetry_client.get_alarm, alarm_id)
diff --git a/tempest/services/telemetry/telemetry_client_base.py b/tempest/services/telemetry/telemetry_client_base.py
index 610f07b..a073f54 100644
--- a/tempest/services/telemetry/telemetry_client_base.py
+++ b/tempest/services/telemetry/telemetry_client_base.py
@@ -73,7 +73,10 @@
return resp, body
def put(self, uri, body):
- return self.rest_client.put(uri, body)
+ body = self.serialize(body)
+ resp, body = self.rest_client.put(uri, body)
+ body = self.deserialize(body)
+ return resp, body
def get(self, uri):
resp, body = self.rest_client.get(uri)
@@ -133,3 +136,15 @@
def create_alarm(self, **kwargs):
uri = "%s/alarms" % self.uri_prefix
return self.post(uri, kwargs)
+
+ def update_alarm(self, alarm_id, **kwargs):
+ uri = "%s/alarms/%s" % (self.uri_prefix, alarm_id)
+ return self.put(uri, kwargs)
+
+ def alarm_get_state(self, alarm_id):
+ uri = "%s/alarms/%s/state" % (self.uri_prefix, alarm_id)
+ return self.get(uri)
+
+ def alarm_set_state(self, alarm_id, state):
+ uri = "%s/alarms/%s/state" % (self.uri_prefix, alarm_id)
+ return self.put(uri, state)