blob: 590f04f87fe3c63c08c2b6e404333c7052247ca1 [file] [log] [blame]
Michal Kobusaa3accf2019-06-05 12:25:09 +02001import fcntl
2import hashlib
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +01003import logging
Michal Kobus211ee922019-04-15 17:44:06 +02004import time
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +01005import uuid
Michal Kobusaa3accf2019-06-05 12:25:09 +02006from contextlib import contextmanager
Michal Kobus8023fd42023-06-22 15:47:58 +02007from urllib3.util.retry import Retry
Michal Kobusaa3accf2019-06-05 12:25:09 +02008
9from cachetools import TTLCache
Michal Kobus915f3cf2018-12-10 20:09:41 +010010
Michal Kobusafbf4d02018-11-28 14:18:05 +010011from prometheus_client import Counter, Gauge
12
Michal Kobus915f3cf2018-12-10 20:09:41 +010013from requests import Session
Michal Kobus8023fd42023-06-22 15:47:58 +020014from requests.adapters import HTTPAdapter
Michal Kobusaa3accf2019-06-05 12:25:09 +020015from requests.exceptions import ConnectionError as RequestsConnectionError
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010016
17from simple_salesforce import Salesforce
Michal Kobusaa3accf2019-06-05 12:25:09 +020018from simple_salesforce import exceptions as sf_exceptions
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010019
20
Michal Kobusaa3accf2019-06-05 12:25:09 +020021STATE_MAP = {
22 'OK': '060 Informational',
23 'UP': '060 Informational',
Michal Kobusa069a1a2021-09-23 18:05:27 +020024 'INFORMATIONAL': '060 Informational',
Michal Kobusaa3accf2019-06-05 12:25:09 +020025 'UNKNOWN': '070 Unknown',
26 'WARNING': '080 Warning',
27 'MINOR': '080 Warning',
28 'MAJOR': '090 Critical',
29 'CRITICAL': '090 Critical',
30 'DOWN': '090 Critical',
31 'UNREACHABLE': '090 Critical',
32}
33
34CONFIG_FIELD_MAP = {
35 'auth_url': 'instance_url',
36 'username': 'username',
37 'password': 'password',
38 'organization_id': 'organizationId',
39 'environment_id': 'environment_id',
40 'sandbox_enabled': 'domain',
Michal Kobus2e85ef82021-06-24 18:01:43 +020041 'feed_enabled': 'feed_enabled',
42 'hash_func': 'hash_func',
Michal Kobusaa3accf2019-06-05 12:25:09 +020043}
44
45ALLOWED_HASHING = ('md5', 'sha256')
46SESSION_FILE = '/tmp/session'
Michal Kobus73d33522018-12-10 11:41:13 +010047
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010048logger = logging.getLogger(__name__)
49
50
Michal Kobusaa3accf2019-06-05 12:25:09 +020051@contextmanager
52def flocked(fd):
53 try:
54 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
55 yield
56 except IOError:
57 logger.info('Session file locked. Waiting 5 seconds...')
58 time.sleep(5)
59 finally:
60 fcntl.flock(fd, fcntl.LOCK_UN)
61
62
63def sf_auth_retry(method):
64 def wrapper(self, *args, **kwargs):
65 try:
66 return method(self, *args, **kwargs)
67 except sf_exceptions.SalesforceExpiredSession:
68 logger.warning('Salesforce session expired.')
69 self.auth()
70 except RequestsConnectionError:
71 logger.error('Salesforce connection error.')
72 self.auth()
73 return method(self, *args, **kwargs)
74 return wrapper
75
76
77class SfNotifierError(Exception):
78 pass
79
80
81class SalesforceClient(object):
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +010082
Michal Kobus7187d6b2023-07-11 11:56:06 +020083 def __init__(self, config, prometheus_registry=None):
84 self.metrics = self._init_metrics(prometheus_registry)
Michal Kobusaa3accf2019-06-05 12:25:09 +020085 self._registered_alerts = TTLCache(maxsize=2048, ttl=300)
Michal Kobus2e85ef82021-06-24 18:01:43 +020086
87 self.config = self._validate_config(config)
88 self.hash_func = self._hash_func(self.config.pop('hash_func'))
89 self.feed_enabled = self.config.pop('feed_enabled')
90
91 self.environment = self.config.pop('environment_id')
Michal Kobus211ee922019-04-15 17:44:06 +020092 self.sf = None
93 self.session = Session()
Michal Kobus8023fd42023-06-22 15:47:58 +020094
95 retry = Retry(connect=3, backoff_factor=0.5)
96 adapter = HTTPAdapter(max_retries=retry)
97 self.session.mount('http://', adapter)
98 self.session.mount('https://', adapter)
99
Michal Kobusaa3accf2019-06-05 12:25:09 +0200100 @staticmethod
Michal Kobus7187d6b2023-07-11 11:56:06 +0200101 def _init_metrics(prometheus_registry):
102 metrics = {
103 'sf_auth_ok': Gauge('sf_auth_ok', 'sf-notifier',
104 multiprocess_mode='max',
105 registry=prometheus_registry),
106 'sf_error_count': Counter('sf_error_count', 'sf-notifier',
107 registry=prometheus_registry),
108 'sf_request_count': Counter('sf_request_count', 'sf-notifier',
109 registry=prometheus_registry)
110 }
111 metrics['sf_auth_ok'].set(0)
112 return metrics
113
114 @staticmethod
Michal Kobus2e85ef82021-06-24 18:01:43 +0200115 def _hash_func(name):
Michal Kobusaa3accf2019-06-05 12:25:09 +0200116 if name in ALLOWED_HASHING:
117 return getattr(hashlib, name)
Michal Kobus2e85ef82021-06-24 18:01:43 +0200118 msg = ('Invalid hashing function "{}".'
119 'Switching to default "sha256".').format(name)
120 logger.warn(msg)
Michal Kobusaa3accf2019-06-05 12:25:09 +0200121 return hashlib.sha256
122
123 @staticmethod
124 def _validate_config(config):
125 kwargs = {}
126
Michal Kobus2e85ef82021-06-24 18:01:43 +0200127 for param, value in config.items():
128 field = CONFIG_FIELD_MAP.get(param.lower())
129 if field is None:
130 env_var = 'SFDC_{}'.format(param)
131 msg = ('Invalid config: missing "{}" field or "{}" environment'
132 ' variable.').format(field, env_var)
133 logger.error(msg)
134 raise SfNotifierError(msg)
135
136 kwargs[field] = value
Michal Kobusaa3accf2019-06-05 12:25:09 +0200137
138 if field == 'domain':
Michal Kobus2e85ef82021-06-24 18:01:43 +0200139 if value:
Michal Kobusaa3accf2019-06-05 12:25:09 +0200140 kwargs[field] = 'test'
141 else:
142 del kwargs[field]
Michal Kobusaa3accf2019-06-05 12:25:09 +0200143
Michal Kobusaa3accf2019-06-05 12:25:09 +0200144 return kwargs
145
Michal Kobus211ee922019-04-15 17:44:06 +0200146 def _auth(self, config):
Michal Kobus17726ae2018-11-27 12:59:55 +0100147 try:
Michal Kobus211ee922019-04-15 17:44:06 +0200148 config.update({'session': self.session})
149 self.sf = Salesforce(**config)
Michal Kobusa3f8fe42022-06-27 10:30:18 +0200150 except Exception as ex:
Michal Kobusf0046f52019-04-24 12:56:41 +0200151 logger.error('Salesforce authentication failure: {}.'.format(ex))
Michal Kobusafbf4d02018-11-28 14:18:05 +0100152 self.metrics['sf_auth_ok'].set(0)
Michal Kobus211ee922019-04-15 17:44:06 +0200153 return False
154
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100155 logger.info('Salesforce authentication successful.')
Michal Kobusafbf4d02018-11-28 14:18:05 +0100156 self.metrics['sf_auth_ok'].set(1)
Michal Kobus211ee922019-04-15 17:44:06 +0200157 return True
158
Michal Kobusaa3accf2019-06-05 12:25:09 +0200159 def _load_session(self, session_file):
160 lines = session_file.readlines()
161
162 if lines == []:
163 return
164 return lines[0]
165
Michal Kobus211ee922019-04-15 17:44:06 +0200166 def _refresh_ready(self, saved_session):
167 if saved_session is None:
168 logger.info('Current session is None.')
169 return True
170
171 if self.sf is None:
172 return False
173
174 if self.sf.session_id == saved_session:
175 return True
176 return False
177
178 def _reuse_session(self, saved_session):
179 logger.info('Reusing session id from file.')
180 # limit params to avoid login request
181 config = {
182 'session_id': saved_session,
183 'instance_url': self.config['instance_url']
184 }
185 return self._auth(config)
186
187 def _acquire_session(self):
188 # only one worker at a time can check session_file
189 auth_success = False
190
Michal Kobusa3f8fe42022-06-27 10:30:18 +0200191 logger.info('Attempting to lock session file.')
Michal Kobus211ee922019-04-15 17:44:06 +0200192 with open(SESSION_FILE, 'r+') as session_file:
193 with flocked(session_file):
194 logger.info('Successfully locked session file for refresh.')
195
196 saved_session = self._load_session(session_file)
197
198 if self._refresh_ready(saved_session):
Michal Kobusf0046f52019-04-24 12:56:41 +0200199 logger.info('Attempting to refresh session.')
Michal Kobus211ee922019-04-15 17:44:06 +0200200
201 if self._auth(self.config):
202 auth_success = True
203 session_file.truncate(0)
204 session_file.seek(0)
205 session_file.write(self.sf.session_id)
206 logger.info('Refreshed session successfully.')
207 else:
208 logger.error('Failed to refresh session.')
209 else:
210 logger.info('Not refreshing. Reusing session.')
211 auth_success = self._reuse_session(saved_session)
212
Michal Kobusf0046f52019-04-24 12:56:41 +0200213 if auth_success is False:
214 logger.warn('Waiting 30 seconds before next attempt...')
215 time.sleep(30)
216
Michal Kobus211ee922019-04-15 17:44:06 +0200217 return auth_success
218
Michal Kobusa3f8fe42022-06-27 10:30:18 +0200219 def auth(self, no_retry=False):
Michal Kobus211ee922019-04-15 17:44:06 +0200220 auth_ok = self._acquire_session()
Michal Kobusa3f8fe42022-06-27 10:30:18 +0200221
222 if no_retry:
223 return
224
Michal Kobus211ee922019-04-15 17:44:06 +0200225 while auth_ok is False:
226 auth_ok = self._acquire_session()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100227
Michal Kobusaa3accf2019-06-05 12:25:09 +0200228 def _get_alert_id(self, labels):
229 alert_id_data = ''
230 for key in sorted(labels):
231 alert_id_data += labels[key].replace(".", "\\.")
Michal Kobus492d8bc2021-02-15 16:54:13 +0100232 return self.hash_func(alert_id_data.encode('utf-8')).hexdigest()
Michal Kobusaa3accf2019-06-05 12:25:09 +0200233
Michal Kobusa069a1a2021-09-23 18:05:27 +0200234 @staticmethod
235 def _is_watchdog(labels):
236 return labels['alertname'].lower() == 'watchdog'
237
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100238 @sf_auth_retry
239 def _create_case(self, subject, body, labels, alert_id):
240
Michal Kobusaa3accf2019-06-05 12:25:09 +0200241 if alert_id in self._registered_alerts:
Michal Kobusafbf4d02018-11-28 14:18:05 +0100242 logger.warning('Duplicate case for alert: {}.'.format(alert_id))
Michal Kobusaa3accf2019-06-05 12:25:09 +0200243 return 1, self._registered_alerts[alert_id]['Id']
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100244
245 severity = labels.get('severity', 'unknown').upper()
246 payload = {
247 'Subject': subject,
248 'Description': body,
249 'IsMosAlert__c': 'true',
250 'Alert_Priority__c': STATE_MAP.get(severity, '070 Unknown'),
251 'Alert_Host__c': labels.get('host') or labels.get(
252 'instance', 'UNKNOWN'
253 ),
254 'Alert_Service__c': labels.get('service', 'UNKNOWN'),
255 'Environment2__c': self.environment,
256 'Alert_ID__c': alert_id,
257 }
Michal Kobusaf771fc2020-04-29 15:57:16 +0200258 if labels.get('cluster_id') is not None:
259 payload['ClusterId__c'] = labels['cluster_id']
260
Michal Kobusa069a1a2021-09-23 18:05:27 +0200261 if self._is_watchdog(labels):
262 payload['IsWatchDogAlert__c'] = 'true'
263
Michal Kobusafbf4d02018-11-28 14:18:05 +0100264 logger.info('Try to create case: {}.'.format(payload))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100265 try:
Michal Kobusafbf4d02018-11-28 14:18:05 +0100266 self.metrics['sf_request_count'].inc()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100267 case = self.sf.Case.create(payload)
Michal Kobusafbf4d02018-11-28 14:18:05 +0100268 logger.info('Created case: {}.'.format(case))
Michal Kobusaa3accf2019-06-05 12:25:09 +0200269 except sf_exceptions.SalesforceMalformedRequest as ex:
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100270 msg = ex.content[0]['message']
271 err_code = ex.content[0]['errorCode']
272
273 if err_code == 'DUPLICATE_VALUE':
Michal Kobus17726ae2018-11-27 12:59:55 +0100274 logger.warning('Duplicate case: {}.'.format(msg))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100275 case_id = msg.split()[-1]
Michal Kobusaa3accf2019-06-05 12:25:09 +0200276 self._registered_alerts[alert_id] = {'Id': case_id}
277 return 1, case_id
Michal Kobus27457d42019-02-13 14:06:11 +0100278
279 logger.error('Cannot create case: {}.'.format(msg))
280 self.metrics['sf_error_count'].inc()
281 raise
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100282
Michal Kobusaa3accf2019-06-05 12:25:09 +0200283 self._registered_alerts[alert_id] = {'Id': case['id']}
284 return 0, case['id']
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100285
286 @sf_auth_retry
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100287 def _close_case(self, case_id):
288 logger.info('Try to close case: {}.'.format(case_id))
Michal Kobusafbf4d02018-11-28 14:18:05 +0100289 self.metrics['sf_request_count'].inc()
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100290 update = self.sf.Case.update(
291 case_id,
292 {'Status': 'Auto-solved', 'Alert_ID__c': uuid.uuid4().hex}
293 )
294 logger.info('Closed case: {}.'.format(case_id))
295 return update
296
297 @sf_auth_retry
298 def _create_feed_item(self, subject, body, case_id):
299 feed_item = {'Title': subject, 'ParentId': case_id, 'Body': body}
Michal Kobus2e85ef82021-06-24 18:01:43 +0200300 logger.debug('Creating feed item: {}.'.format(feed_item))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100301 return self.sf.FeedItem.create(feed_item)
302
303 @sf_auth_retry
304 def _get_case_by_alert_id(self, alert_id):
305 logger.info('Try to get case by alert ID: {}.'.format(alert_id))
306
307 if alert_id in self._registered_alerts:
Michal Kobusaa3accf2019-06-05 12:25:09 +0200308 return self._registered_alerts[alert_id]
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100309 try:
310 return self.sf.Case.get_by_custom_id('Alert_ID__c', alert_id)
Michal Kobusaa3accf2019-06-05 12:25:09 +0200311 except sf_exceptions.SalesforceResourceNotFound:
312 if self._registered_alerts.get(alert_id):
313 del self._registered_alerts[alert_id]
Michal Kobusba987052018-11-30 13:01:08 +0100314
Michal Kobus27457d42019-02-13 14:06:11 +0100315 logger.warning('Alert ID: {} not found.'.format(alert_id))
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100316
Michal Kobus915f3cf2018-12-10 20:09:41 +0100317 def create_case(self, subject, body, labels):
Michal Kobus77a36c22023-07-11 11:56:06 +0200318
319 if self.sf is None:
320 self.auth(no_retry=True)
321
Michal Kobusaa3accf2019-06-05 12:25:09 +0200322 alert_id = self._get_alert_id(labels)
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100323
Michal Kobusaa3accf2019-06-05 12:25:09 +0200324 error_code, case_id = self._create_case(subject, body,
325 labels, alert_id)
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100326
Michal Kobusaa3accf2019-06-05 12:25:09 +0200327 response = {'case_id': case_id, 'alert_id': alert_id}
Michal Kobus915f3cf2018-12-10 20:09:41 +0100328
Michal Kobusaa3accf2019-06-05 12:25:09 +0200329 if error_code == 1:
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100330 response['status'] = 'duplicate'
Michal Kobus915f3cf2018-12-10 20:09:41 +0100331 else:
332 response['status'] = 'created'
Michal Kobus2e85ef82021-06-24 18:01:43 +0200333
Michal Kobusa069a1a2021-09-23 18:05:27 +0200334 if self.feed_enabled or self._is_watchdog(labels):
Michal Kobus2e85ef82021-06-24 18:01:43 +0200335 self._create_feed_item(subject, body, case_id)
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100336 return response
337
338 def close_case(self, labels):
Michal Kobus77a36c22023-07-11 11:56:06 +0200339
340 if self.sf is None:
341 self.auth(no_retry=True)
342
Michal Kobusaa3accf2019-06-05 12:25:09 +0200343 alert_id = self._get_alert_id(labels)
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100344 case = self._get_case_by_alert_id(alert_id)
345
346 response = {'alert_id': alert_id, 'status': 'resolved'}
347
348 if case is None:
349 return response
350
Michal Kobusaa3accf2019-06-05 12:25:09 +0200351 if self._registered_alerts.get(alert_id):
352 del self._registered_alerts[alert_id]
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100353
Michal Kobusaa3accf2019-06-05 12:25:09 +0200354 response['case_id'] = case['Id']
355 response['closed'] = self._close_case(case['Id'])
Mateusz Matuszkowiak2820c662018-11-21 12:07:25 +0100356 return response