Create event type alarm for aodh API test
The purpose is to remove dependency of gocchiclient.
Change-Id: I4c07a8d15338b6ec80ca95cef64ab74ed6028319
diff --git a/telemetry_tempest_plugin/aodh/api/base.py b/telemetry_tempest_plugin/aodh/api/base.py
index 1035df7..29b1d61 100644
--- a/telemetry_tempest_plugin/aodh/api/base.py
+++ b/telemetry_tempest_plugin/aodh/api/base.py
@@ -43,10 +43,10 @@
cls.alarm_ids = []
@classmethod
- def create_alarm(cls, **kwargs):
+ def create_alarm(cls, type='event', **kwargs):
body = cls.alarming_client.create_alarm(
name=data_utils.rand_name('telemetry_alarm'),
- type='gnocchi_aggregation_by_metrics_threshold', **kwargs)
+ type=type, **kwargs)
cls.alarm_ids.append(body['alarm_id'])
return body
diff --git a/telemetry_tempest_plugin/aodh/api/test_alarming_api.py b/telemetry_tempest_plugin/aodh/api/test_alarming_api.py
index 1c42e63..9032a1a 100644
--- a/telemetry_tempest_plugin/aodh/api/test_alarming_api.py
+++ b/telemetry_tempest_plugin/aodh/api/test_alarming_api.py
@@ -26,14 +26,16 @@
super(TelemetryAlarmingAPITest, cls).resource_setup()
if CONF.alarming_plugin.create_alarms:
- cls.rule = {'metrics': ['c0d457b6-957e-41de-a384-d5eb0957de3b'],
- 'comparison_operator': 'gt',
- 'aggregation_method': 'mean',
- 'threshold': 80.0,
- 'granularity': 70}
+ cls.rule = {
+ "event_type": "compute.instance.*",
+ "query": [
+ {"field": "traits.name",
+ "type": "string",
+ "op": "eq",
+ "value": "test"}]
+ }
for i in range(2):
- cls.create_alarm(
- gnocchi_aggregation_by_metrics_threshold_rule=cls.rule)
+ cls.create_alarm(event_rule=cls.rule)
@decorators.idempotent_id('1c918e06-210b-41eb-bd45-14676dd77cd7')
def test_alarm_list(self):
@@ -53,38 +55,42 @@
# Create an alarm
alarm_name = data_utils.rand_name('telemetry_alarm')
body = self.alarming_client.create_alarm(
- name=alarm_name, type='gnocchi_aggregation_by_metrics_threshold',
- gnocchi_aggregation_by_metrics_threshold_rule=self.rule)
+ name=alarm_name, type='event',
+ event_rule=self.rule)
self.assertEqual(alarm_name, body['name'])
alarm_id = body['alarm_id']
- self.assertDictContainsSubset(self.rule, body[
- 'gnocchi_aggregation_by_metrics_threshold_rule'])
+ self.assertDictContainsSubset(self.rule, body['event_rule'])
+
# Update alarm with new rule and new name
- new_rule = {'metrics': ['c0d457b6-957e-41de-a384-d5eb0957de3b'],
- 'comparison_operator': 'eq',
- 'aggregation_method': 'mean',
- 'threshold': 70.0,
- 'granularity': 60}
+ new_rule = {
+ "event_type": "compute.instance.create",
+ "query": [
+ {"field": "traits.name",
+ "type": "string",
+ "op": "eq",
+ "value": "test"}]
+ }
alarm_name_updated = data_utils.rand_name('telemetry-alarm-update')
body = self.alarming_client.update_alarm(
alarm_id,
- gnocchi_aggregation_by_metrics_threshold_rule=new_rule,
+ event_rule=new_rule,
name=alarm_name_updated,
- type='gnocchi_aggregation_by_metrics_threshold')
+ type='event')
self.assertEqual(alarm_name_updated, body['name'])
- self.assertDictContainsSubset(
- new_rule, body['gnocchi_aggregation_by_metrics_threshold_rule'])
+ self.assertDictContainsSubset(new_rule, body['event_rule'])
+
# Get and verify details of an alarm after update
body = self.alarming_client.show_alarm(alarm_id)
self.assertEqual(alarm_name_updated, body['name'])
- self.assertDictContainsSubset(
- new_rule, body['gnocchi_aggregation_by_metrics_threshold_rule'])
+ self.assertDictContainsSubset(new_rule, body['event_rule'])
+
# Get history for the alarm and verify the same
body = self.alarming_client.show_alarm_history(alarm_id)
self.assertEqual("rule change", body[0]['type'])
self.assertIn(alarm_name_updated, body[0]['detail'])
self.assertEqual("creation", body[1]['type'])
self.assertIn(alarm_name, body[1]['detail'])
+
# Delete alarm and verify if deleted
self.alarming_client.delete_alarm(alarm_id)
self.assertRaises(lib_exc.NotFound,
@@ -93,13 +99,14 @@
@decorators.idempotent_id('aca49486-70bb-4016-87e0-f6131374f742')
def test_set_get_alarm_state(self):
alarm_states = ['ok', 'alarm', 'insufficient data']
- alarm = self.create_alarm(
- gnocchi_aggregation_by_metrics_threshold_rule=self.rule)
+ alarm = self.create_alarm(event_rule=self.rule)
+
# Set alarm state and verify
new_state =\
[elem for elem in alarm_states if elem != alarm['state']][0]
state = self.alarming_client.alarm_set_state(alarm['alarm_id'],
new_state)
+
self.assertEqual(new_state, state.data)
# Get alarm state and verify
state = self.alarming_client.show_alarm_state(alarm['alarm_id'])