blob: 39d2cf8914600200e6bfcc17b6c4f10d6eda500c [file] [log] [blame]
Chandan Kumar6f97ca02017-12-07 12:50:34 +05301# Copyright 2014 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16from six.moves.urllib import parse as urllib
17from tempest import config
18from tempest.lib.common import rest_client
19from tempest import manager
20import ujson
21
22CONF = config.CONF
23
24
25class AlarmingClient(rest_client.RestClient):
26
27 version = '2'
28 uri_prefix = "v2"
29
30 def deserialize(self, body):
31 return ujson.loads(body.replace("\n", ""))
32
33 def serialize(self, body):
34 return ujson.dumps(body)
35
36 def list_alarms(self, query=None):
37 uri = '%s/alarms' % self.uri_prefix
38 uri_dict = {}
39 if query:
40 uri_dict = {'q.field': query[0],
41 'q.op': query[1],
42 'q.value': query[2]}
43 if uri_dict:
44 uri += "?%s" % urllib.urlencode(uri_dict)
45 resp, body = self.get(uri)
46 self.expected_success(200, resp.status)
47 body = self.deserialize(body)
48 return rest_client.ResponseBodyList(resp, body)
49
50 def show_alarm(self, alarm_id):
51 uri = '%s/alarms/%s' % (self.uri_prefix, alarm_id)
52 resp, body = self.get(uri)
53 self.expected_success(200, resp.status)
54 body = self.deserialize(body)
55 return rest_client.ResponseBody(resp, body)
56
57 def show_alarm_history(self, alarm_id):
58 uri = "%s/alarms/%s/history" % (self.uri_prefix, alarm_id)
59 resp, body = self.get(uri)
60 self.expected_success(200, resp.status)
61 body = self.deserialize(body)
62 return rest_client.ResponseBodyList(resp, body)
63
64 def delete_alarm(self, alarm_id):
65 uri = "%s/alarms/%s" % (self.uri_prefix, alarm_id)
66 resp, body = self.delete(uri)
67 self.expected_success(204, resp.status)
68 if body:
69 body = self.deserialize(body)
70 return rest_client.ResponseBody(resp, body)
71
72 def create_alarm(self, **kwargs):
73 uri = "%s/alarms" % self.uri_prefix
74 body = self.serialize(kwargs)
75 resp, body = self.post(uri, body)
76 self.expected_success(201, resp.status)
77 body = self.deserialize(body)
78 return rest_client.ResponseBody(resp, body)
79
80 def update_alarm(self, alarm_id, **kwargs):
81 uri = "%s/alarms/%s" % (self.uri_prefix, alarm_id)
82 body = self.serialize(kwargs)
83 resp, body = self.put(uri, body)
84 self.expected_success(200, resp.status)
85 body = self.deserialize(body)
86 return rest_client.ResponseBody(resp, body)
87
88 def show_alarm_state(self, alarm_id):
89 uri = "%s/alarms/%s/state" % (self.uri_prefix, alarm_id)
90 resp, body = self.get(uri)
91 self.expected_success(200, resp.status)
92 body = self.deserialize(body)
93 return rest_client.ResponseBodyData(resp, body)
94
95 def alarm_set_state(self, alarm_id, state):
96 uri = "%s/alarms/%s/state" % (self.uri_prefix, alarm_id)
97 body = self.serialize(state)
98 resp, body = self.put(uri, body)
99 self.expected_success(200, resp.status)
100 body = self.deserialize(body)
101 return rest_client.ResponseBodyData(resp, body)
102
103
104class Manager(manager.Manager):
105
106 default_params = {
107 'disable_ssl_certificate_validation':
108 CONF.identity.disable_ssl_certificate_validation,
109 'ca_certs': CONF.identity.ca_certificates_file,
110 'trace_requests': CONF.debug.trace_requests
111 }
112
113 alarming_params = {
114 'service': CONF.alarming_plugin.catalog_type,
115 'region': CONF.identity.region,
116 'endpoint_type': CONF.alarming_plugin.endpoint_type,
117 }
118 alarming_params.update(default_params)
119
120 def __init__(self, credentials=None, service=None):
121 super(Manager, self).__init__(credentials)
122 self.set_alarming_client()
123
124 def set_alarming_client(self):
125 self.alarming_client = AlarmingClient(self.auth_provider,
126 **self.alarming_params)