blob: ee5db915105eb73f36af6d5c2cbf8e3eea094f2b [file] [log] [blame]
import logging
import time
import uuid
from prometheus_client import Counter, Gauge
from requests import Session
from simple_salesforce import Salesforce
from simple_salesforce.exceptions import (SalesforceAuthenticationFailed,
SalesforceMalformedRequest,
SalesforceResourceNotFound)
from cache import Cache
from decorators import flocked, sf_auth_retry
from mixins import SalesforceMixin
from settings import CASE_STATUS, SESSION_FILE, STATE_MAP
logger = logging.getLogger(__name__)
class SalesforceClient(SalesforceMixin):
def __init__(self, config):
self.metrics = {
'sf_auth_ok': Gauge('sf_auth_ok', 'sf-notifier'),
'sf_error_count': Counter('sf_error_count', 'sf-notifier'),
'sf_request_count': Counter('sf_request_count', 'sf-notifier')
}
self.config = self._validate_config(config)
self.hash_func = self._hash_func()
self.environment = self.config.pop('environment_id')
self._registered_alerts = Cache()
self.sf = None
self.session = Session()
self.auth()
def _auth(self, config):
try:
config.update({'session': self.session})
self.sf = Salesforce(**config)
except SalesforceAuthenticationFailed as ex:
logger.error('Salesforce authentication failure: {}.'.format(ex))
self.metrics['sf_auth_ok'].set(0)
return False
logger.info('Salesforce authentication successful.')
self.metrics['sf_auth_ok'].set(1)
return True
def _refresh_ready(self, saved_session):
if saved_session is None:
logger.info('Current session is None.')
return True
if self.sf is None:
return False
if self.sf.session_id == saved_session:
return True
return False
def _reuse_session(self, saved_session):
logger.info('Reusing session id from file.')
# limit params to avoid login request
config = {
'session_id': saved_session,
'instance_url': self.config['instance_url']
}
return self._auth(config)
def _acquire_session(self):
# only one worker at a time can check session_file
auth_success = False
with open(SESSION_FILE, 'r+') as session_file:
with flocked(session_file):
logger.info('Successfully locked session file for refresh.')
saved_session = self._load_session(session_file)
if self._refresh_ready(saved_session):
logger.info('Attempting to refresh session.')
if self._auth(self.config):
auth_success = True
session_file.truncate(0)
session_file.seek(0)
session_file.write(self.sf.session_id)
logger.info('Refreshed session successfully.')
else:
logger.error('Failed to refresh session.')
else:
logger.info('Not refreshing. Reusing session.')
auth_success = self._reuse_session(saved_session)
if auth_success is False:
logger.warn('Waiting 30 seconds before next attempt...')
time.sleep(30)
return auth_success
def auth(self):
auth_ok = self._acquire_session()
while auth_ok is False:
auth_ok = self._acquire_session()
@sf_auth_retry
def _create_case(self, subject, body, labels, alert_id):
cached_alert = self._registered_alerts.get(alert_id)
if cached_alert is not None and cached_alert['id'] != 'error':
logger.warning('Duplicate case for alert: {}.'.format(alert_id))
return cached_alert
case = self._fmt_alert_update('pending', 1)
self._registered_alerts.update(alert_id, case)
severity = labels.get('severity', 'unknown').upper()
payload = {
'Subject': subject,
'Description': body,
'IsMosAlert__c': 'true',
'Alert_Priority__c': STATE_MAP.get(severity, '070 Unknown'),
'Alert_Host__c': labels.get('host') or labels.get(
'instance', 'UNKNOWN'
),
'Alert_Service__c': labels.get('service', 'UNKNOWN'),
'Environment2__c': self.environment,
'Alert_ID__c': alert_id,
}
logger.info('Try to create case: {}.'.format(payload))
try:
self.metrics['sf_request_count'].inc()
case = self.sf.Case.create(payload)
logger.info('Created case: {}.'.format(case))
except SalesforceMalformedRequest as ex:
msg = ex.content[0]['message']
err_code = ex.content[0]['errorCode']
if err_code == 'DUPLICATE_VALUE':
logger.warning('Duplicate case: {}.'.format(msg))
case_id = msg.split()[-1]
case = self._fmt_alert_update(case_id, 1)
self._registered_alerts.update(alert_id, case)
return case
case = self._fmt_alert_update('error', 2, error_msg=msg)
self._registered_alerts.update(alert_id, case)
logger.error('Cannot create case: {}.'.format(msg))
self.metrics['sf_error_count'].inc()
raise
case = self._fmt_alert_update(case['id'], 0)
self._registered_alerts.update(alert_id, case)
return case
@sf_auth_retry
def _close_case(self, case_id):
logger.info('Try to close case: {}.'.format(case_id))
self.metrics['sf_request_count'].inc()
update = self.sf.Case.update(
case_id,
{'Status': 'Auto-solved', 'Alert_ID__c': uuid.uuid4().hex}
)
logger.info('Closed case: {}.'.format(case_id))
return update
@sf_auth_retry
def _create_feed_item(self, subject, body, case_id):
feed_item = {'Title': subject, 'ParentId': case_id, 'Body': body}
return self.sf.FeedItem.create(feed_item)
@sf_auth_retry
def _get_case_by_alert_id(self, alert_id):
logger.info('Try to get case by alert ID: {}.'.format(alert_id))
if alert_id in self._registered_alerts:
return self._registered_alerts.get(alert_id)
try:
return self.sf.Case.get_by_custom_id('Alert_ID__c', alert_id)
except SalesforceResourceNotFound:
self._registered_alerts.delete(alert_id)
logger.warning('Alert ID: {} not found.'.format(alert_id))
def create_case(self, subject, body, labels):
alert_id = self.get_alert_id(labels)
case = self._create_case(subject, body, labels, alert_id)
if case['error_code'] == 0 or \
(self._feed_update_ready(case['last_update']) and
case['id'] not in CASE_STATUS):
self._create_feed_item(subject, body, case['id'])
case = self._fmt_alert_update(case['id'], case['error_code'])
self._registered_alerts.update(alert_id, case)
response = {'case_id': case['id'], 'alert_id': alert_id}
if case['error_code'] == 1:
response['status'] = 'duplicate'
else:
response['status'] = 'created'
return response
def close_case(self, labels):
alert_id = self.get_alert_id(labels)
case = self._get_case_by_alert_id(alert_id)
response = {'alert_id': alert_id, 'status': 'resolved'}
if case is None:
return response
self._registered_alerts.delete(alert_id)
response['case_id'] = case['id']
response['closed'] = self._close_case(case['id'])
return response