blob: 95f324d6d2f46a31260c10916a95fa1982005d0a [file] [log] [blame]
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +01001# Copyright 2018: Mirantis Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Michal Kobus211ee922019-04-15 17:44:06 +020016import fcntl
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010017import hashlib
18import logging
19import os
Michal Kobus211ee922019-04-15 17:44:06 +020020import time
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010021import uuid
Michal Kobus211ee922019-04-15 17:44:06 +020022from contextlib import contextmanager
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010023
Michal Kobus915f3cf2018-12-10 20:09:41 +010024from cachetools import TTLCache
25
Michal Kobusafbf4d02018-11-28 14:18:05 +010026from prometheus_client import Counter, Gauge
27
Michal Kobus915f3cf2018-12-10 20:09:41 +010028from requests import Session
Michal Kobus4104c102019-02-22 17:05:11 +010029from requests.exceptions import ConnectionError as RequestsConnectionError
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010030
31from simple_salesforce import Salesforce
32from simple_salesforce import exceptions as sf_exceptions
33
34
35STATE_MAP = {
36 'OK': '060 Informational',
37 'UP': '060 Informational',
38 'UNKNOWN': '070 Unknown',
39 'WARNING': '080 Warning',
40 'MINOR': '080 Warning',
41 'MAJOR': '090 Critical',
42 'CRITICAL': '090 Critical',
43 'DOWN': '090 Critical',
44 'UNREACHABLE': '090 Critical',
45}
46
47CONFIG_FIELD_MAP = {
Michal Kobus211ee922019-04-15 17:44:06 +020048 'auth_url': 'instance_url',
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010049 'username': 'username',
50 'password': 'password',
51 'organization_id': 'organizationId',
52 'environment_id': 'environment_id',
53 'sandbox_enabled': 'domain',
54}
55
Michal Kobus73d33522018-12-10 11:41:13 +010056ALLOWED_HASHING = ('md5', 'sha256')
Michal Kobus211ee922019-04-15 17:44:06 +020057SESSION_FILE = '/tmp/session'
Michal Kobus73d33522018-12-10 11:41:13 +010058
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010059logger = logging.getLogger(__name__)
60
61
Michal Kobus211ee922019-04-15 17:44:06 +020062@contextmanager
63def flocked(fd):
64 try:
65 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
66 yield
67 except IOError:
68 logger.info('Waiting for session file 5 seconds...')
69 time.sleep(5)
70 finally:
71 fcntl.flock(fd, fcntl.LOCK_UN)
72
73
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010074def sf_auth_retry(method):
75 def wrapper(self, *args, **kwargs):
76 try:
77 return method(self, *args, **kwargs)
78 except sf_exceptions.SalesforceExpiredSession:
79 logger.warning('Salesforce session expired.')
Michal Kobus211ee922019-04-15 17:44:06 +020080 self.auth()
Michal Kobus4104c102019-02-22 17:05:11 +010081 except RequestsConnectionError:
Michal Kobus915f3cf2018-12-10 20:09:41 +010082 logger.error('Salesforce connection error.')
Michal Kobus211ee922019-04-15 17:44:06 +020083 self.auth()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010084 return method(self, *args, **kwargs)
85 return wrapper
86
87
88class SfNotifierError(Exception):
Michal Kobusee36c422018-11-26 15:02:31 +010089 pass
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010090
91
92class SalesforceClient(object):
93
94 def __init__(self, config):
Michal Kobusafbf4d02018-11-28 14:18:05 +010095 self.metrics = {
96 'sf_auth_ok': Gauge('sf_auth_ok', 'sf-notifier'),
97 'sf_error_count': Counter('sf_error_count', 'sf-notifier'),
98 'sf_request_count': Counter('sf_request_count', 'sf-notifier')
99 }
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100100 self.config = self._validate_config(config)
Michal Kobus73d33522018-12-10 11:41:13 +0100101 self.hash_func = self._hash_func()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100102 self.environment = self.config.pop('environment_id')
Michal Kobus915f3cf2018-12-10 20:09:41 +0100103 self._registered_alerts = TTLCache(maxsize=2048, ttl=300)
Michal Kobus211ee922019-04-15 17:44:06 +0200104 self.sf = None
105 self.session = Session()
106 self.auth()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100107
Michal Kobusee36c422018-11-26 15:02:31 +0100108 @staticmethod
Michal Kobus73d33522018-12-10 11:41:13 +0100109 def _hash_func():
110 name = os.environ.get('SF_NOTIFIER_ALERT_ID_HASH_FUNC', 'sha256')
111 if name in ALLOWED_HASHING:
112 return getattr(hashlib, name)
113 return hashlib.sha256
114
115 @staticmethod
Michal Kobusee36c422018-11-26 15:02:31 +0100116 def _validate_config(config):
117 kwargs = {}
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100118
119 for param, field in CONFIG_FIELD_MAP.iteritems():
120 setting_var = param.upper()
121 env_var = 'SFDC_{}'.format(setting_var)
122 kwargs[field] = os.environ.get(
Michal Kobusee36c422018-11-26 15:02:31 +0100123 env_var, config.get(setting_var))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100124
125 if field == 'domain':
Michal Kobus17726ae2018-11-27 12:59:55 +0100126 if kwargs[field] in ['true', 'True', True]:
Michal Kobusee36c422018-11-26 15:02:31 +0100127 kwargs[field] = 'test'
128 else:
129 del kwargs[field]
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100130 continue
131
132 if kwargs[field] is None:
133 msg = ('Invalid config: missing "{}" field or "{}" environment'
134 ' variable.').format(param, env_var)
135 logger.error(msg)
136 raise SfNotifierError(msg)
137 return kwargs
138
Michal Kobus211ee922019-04-15 17:44:06 +0200139 def _auth(self, config):
Michal Kobus17726ae2018-11-27 12:59:55 +0100140 try:
Michal Kobus211ee922019-04-15 17:44:06 +0200141 config.update({'session': self.session})
142 self.sf = Salesforce(**config)
Michal Kobus17726ae2018-11-27 12:59:55 +0100143 except sf_exceptions.SalesforceAuthenticationFailed:
144 logger.error('Salesforce authentication failure.')
Michal Kobusafbf4d02018-11-28 14:18:05 +0100145 self.metrics['sf_auth_ok'].set(0)
Michal Kobus211ee922019-04-15 17:44:06 +0200146 return False
147
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100148 logger.info('Salesforce authentication successful.')
Michal Kobusafbf4d02018-11-28 14:18:05 +0100149 self.metrics['sf_auth_ok'].set(1)
Michal Kobus211ee922019-04-15 17:44:06 +0200150 return True
151
152 def _load_session(self, session_file):
153 lines = session_file.readlines()
154
155 if lines == []:
156 return
157 return lines[0]
158
159 def _refresh_ready(self, saved_session):
160 if saved_session is None:
161 logger.info('Current session is None.')
162 return True
163
164 if self.sf is None:
165 return False
166
167 if self.sf.session_id == saved_session:
168 return True
169 return False
170
171 def _reuse_session(self, saved_session):
172 logger.info('Reusing session id from file.')
173 # limit params to avoid login request
174 config = {
175 'session_id': saved_session,
176 'instance_url': self.config['instance_url']
177 }
178 return self._auth(config)
179
180 def _acquire_session(self):
181 # only one worker at a time can check session_file
182 auth_success = False
183
184 with open(SESSION_FILE, 'r+') as session_file:
185 with flocked(session_file):
186 logger.info('Successfully locked session file for refresh.')
187
188 saved_session = self._load_session(session_file)
189
190 if self._refresh_ready(saved_session):
191 logger.info('Attepmting to refresh session.')
192
193 if self._auth(self.config):
194 auth_success = True
195 session_file.truncate(0)
196 session_file.seek(0)
197 session_file.write(self.sf.session_id)
198 logger.info('Refreshed session successfully.')
199 else:
200 logger.error('Failed to refresh session.')
201 else:
202 logger.info('Not refreshing. Reusing session.')
203 auth_success = self._reuse_session(saved_session)
204
205 return auth_success
206
207 def auth(self):
208 auth_ok = self._acquire_session()
209 while auth_ok is False:
210 auth_ok = self._acquire_session()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100211
Michal Kobus73d33522018-12-10 11:41:13 +0100212 def _get_alert_id(self, labels):
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100213 alert_id_data = ''
214 for key in sorted(labels):
215 alert_id_data += labels[key].replace(".", "\\.")
Michal Kobus73d33522018-12-10 11:41:13 +0100216 return self.hash_func(alert_id_data).hexdigest()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100217
218 @sf_auth_retry
219 def _create_case(self, subject, body, labels, alert_id):
220
221 if alert_id in self._registered_alerts:
Michal Kobusafbf4d02018-11-28 14:18:05 +0100222 logger.warning('Duplicate case for alert: {}.'.format(alert_id))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100223 return 1, self._registered_alerts[alert_id]['Id']
224
225 severity = labels.get('severity', 'unknown').upper()
226 payload = {
227 'Subject': subject,
228 'Description': body,
229 'IsMosAlert__c': 'true',
230 'Alert_Priority__c': STATE_MAP.get(severity, '070 Unknown'),
231 'Alert_Host__c': labels.get('host') or labels.get(
232 'instance', 'UNKNOWN'
233 ),
234 'Alert_Service__c': labels.get('service', 'UNKNOWN'),
235 'Environment2__c': self.environment,
236 'Alert_ID__c': alert_id,
237 }
Michal Kobusafbf4d02018-11-28 14:18:05 +0100238 logger.info('Try to create case: {}.'.format(payload))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100239 try:
Michal Kobusafbf4d02018-11-28 14:18:05 +0100240 self.metrics['sf_request_count'].inc()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100241 case = self.sf.Case.create(payload)
Michal Kobusafbf4d02018-11-28 14:18:05 +0100242 logger.info('Created case: {}.'.format(case))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100243 except sf_exceptions.SalesforceMalformedRequest as ex:
244 msg = ex.content[0]['message']
245 err_code = ex.content[0]['errorCode']
246
247 if err_code == 'DUPLICATE_VALUE':
Michal Kobus17726ae2018-11-27 12:59:55 +0100248 logger.warning('Duplicate case: {}.'.format(msg))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100249 case_id = msg.split()[-1]
250 self._registered_alerts[alert_id] = {'Id': case_id}
251 return 1, case_id
Michal Kobus27457d42019-02-13 14:06:11 +0100252
253 logger.error('Cannot create case: {}.'.format(msg))
254 self.metrics['sf_error_count'].inc()
255 raise
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100256
257 self._registered_alerts[alert_id] = {'Id': case['id']}
258 return 0, case['id']
259
260 @sf_auth_retry
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100261 def _close_case(self, case_id):
262 logger.info('Try to close case: {}.'.format(case_id))
Michal Kobusafbf4d02018-11-28 14:18:05 +0100263 self.metrics['sf_request_count'].inc()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100264 update = self.sf.Case.update(
265 case_id,
266 {'Status': 'Auto-solved', 'Alert_ID__c': uuid.uuid4().hex}
267 )
268 logger.info('Closed case: {}.'.format(case_id))
269 return update
270
271 @sf_auth_retry
272 def _create_feed_item(self, subject, body, case_id):
273 feed_item = {'Title': subject, 'ParentId': case_id, 'Body': body}
274 return self.sf.FeedItem.create(feed_item)
275
276 @sf_auth_retry
277 def _get_case_by_alert_id(self, alert_id):
278 logger.info('Try to get case by alert ID: {}.'.format(alert_id))
279
280 if alert_id in self._registered_alerts:
281 return self._registered_alerts[alert_id]
282 try:
283 return self.sf.Case.get_by_custom_id('Alert_ID__c', alert_id)
284 except sf_exceptions.SalesforceResourceNotFound:
Michal Kobusba987052018-11-30 13:01:08 +0100285 if self._registered_alerts.get(alert_id):
286 del self._registered_alerts[alert_id]
287
Michal Kobus27457d42019-02-13 14:06:11 +0100288 logger.warning('Alert ID: {} not found.'.format(alert_id))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100289
Michal Kobus915f3cf2018-12-10 20:09:41 +0100290 def create_case(self, subject, body, labels):
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100291 alert_id = self._get_alert_id(labels)
292
293 error_code, case_id = self._create_case(subject, body,
294 labels, alert_id)
295
Michal Kobus915f3cf2018-12-10 20:09:41 +0100296 self._create_feed_item(subject, body, case_id)
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100297
Michal Kobus915f3cf2018-12-10 20:09:41 +0100298 response = {'case_id': case_id, 'alert_id': alert_id}
299
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100300 if error_code == 1:
301 response['status'] = 'duplicate'
Michal Kobus915f3cf2018-12-10 20:09:41 +0100302 else:
303 response['status'] = 'created'
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100304 return response
305
306 def close_case(self, labels):
307 alert_id = self._get_alert_id(labels)
308 case = self._get_case_by_alert_id(alert_id)
309
310 response = {'alert_id': alert_id, 'status': 'resolved'}
311
312 if case is None:
313 return response
314
315 if self._registered_alerts.get(alert_id):
316 del self._registered_alerts[alert_id]
317
318 response['case_id'] = case['Id']
319 response['closed'] = self._close_case(case['Id'])
320 return response