blob: 63d138fbe3a7c26e7ba8863830eb0b80228b6f27 [file] [log] [blame]
# Copyright 2018: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import logging
import os
import uuid
from cachetools import TTLCache
from prometheus_client import Counter, Gauge
from requests import Session
from requests import exceptions as req_exceptions
from simple_salesforce import Salesforce
from simple_salesforce import exceptions as sf_exceptions
STATE_MAP = {
'OK': '060 Informational',
'UP': '060 Informational',
'UNKNOWN': '070 Unknown',
'WARNING': '080 Warning',
'MINOR': '080 Warning',
'MAJOR': '090 Critical',
'CRITICAL': '090 Critical',
'DOWN': '090 Critical',
'UNREACHABLE': '090 Critical',
}
CONFIG_FIELD_MAP = {
'auth_url': 'instance',
'username': 'username',
'password': 'password',
'organization_id': 'organizationId',
'environment_id': 'environment_id',
'sandbox_enabled': 'domain',
}
ALLOWED_HASHING = ('md5', 'sha256')
logger = logging.getLogger(__name__)
def sf_auth_retry(method):
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
except sf_exceptions.SalesforceExpiredSession:
logger.warning('Salesforce session expired.')
self._auth()
except req_exceptions.ConnectionError:
logger.error('Salesforce connection error.')
self._auth()
return method(self, *args, **kwargs)
return wrapper
class SfNotifierError(Exception):
pass
class SalesforceClient(object):
def __init__(self, config):
self.metrics = {
'sf_auth_ok': Gauge('sf_auth_ok', 'sf-notifier'),
'sf_error_count': Counter('sf_error_count', 'sf-notifier'),
'sf_request_count': Counter('sf_request_count', 'sf-notifier')
}
self.session = Session()
self.config = self._validate_config(config)
self.hash_func = self._hash_func()
self.environment = self.config.pop('environment_id')
self._auth()
self._registered_alerts = TTLCache(maxsize=2048, ttl=300)
@staticmethod
def _hash_func():
name = os.environ.get('SF_NOTIFIER_ALERT_ID_HASH_FUNC', 'sha256')
if name in ALLOWED_HASHING:
return getattr(hashlib, name)
return hashlib.sha256
@staticmethod
def _validate_config(config):
kwargs = {}
for param, field in CONFIG_FIELD_MAP.iteritems():
setting_var = param.upper()
env_var = 'SFDC_{}'.format(setting_var)
kwargs[field] = os.environ.get(
env_var, config.get(setting_var))
if field == 'domain':
if kwargs[field] in ['true', 'True', True]:
kwargs[field] = 'test'
else:
del kwargs[field]
continue
if kwargs[field] is None:
msg = ('Invalid config: missing "{}" field or "{}" environment'
' variable.').format(param, env_var)
logger.error(msg)
raise SfNotifierError(msg)
return kwargs
def _auth(self):
kwargs = {'session': self.session}
kwargs.update(self.config)
try:
self.sf = Salesforce(**kwargs)
except sf_exceptions.SalesforceAuthenticationFailed:
logger.error('Salesforce authentication failure.')
self.metrics['sf_auth_ok'].set(0)
return
logger.info('Salesforce authentication successful.')
self.metrics['sf_auth_ok'].set(1)
def _get_alert_id(self, labels):
alert_id_data = ''
for key in sorted(labels):
alert_id_data += labels[key].replace(".", "\\.")
return self.hash_func(alert_id_data).hexdigest()
@sf_auth_retry
def _create_case(self, subject, body, labels, alert_id):
if alert_id in self._registered_alerts:
logger.warning('Duplicate case for alert: {}.'.format(alert_id))
return 1, self._registered_alerts[alert_id]['Id']
severity = labels.get('severity', 'unknown').upper()
payload = {
'Subject': subject,
'Description': body,
'IsMosAlert__c': 'true',
'Alert_Priority__c': STATE_MAP.get(severity, '070 Unknown'),
'Alert_Host__c': labels.get('host') or labels.get(
'instance', 'UNKNOWN'
),
'Alert_Service__c': labels.get('service', 'UNKNOWN'),
'Environment2__c': self.environment,
'Alert_ID__c': alert_id,
}
logger.info('Try to create case: {}.'.format(payload))
try:
self.metrics['sf_request_count'].inc()
case = self.sf.Case.create(payload)
logger.info('Created case: {}.'.format(case))
except sf_exceptions.SalesforceMalformedRequest as ex:
msg = ex.content[0]['message']
err_code = ex.content[0]['errorCode']
if err_code == 'DUPLICATE_VALUE':
logger.warning('Duplicate case: {}.'.format(msg))
case_id = msg.split()[-1]
self._registered_alerts[alert_id] = {'Id': case_id}
return 1, case_id
else:
self.metrics['sf_error_count'].inc()
raise
self._registered_alerts[alert_id] = {'Id': case['id']}
return 0, case['id']
@sf_auth_retry
def _close_case(self, case_id):
logger.info('Try to close case: {}.'.format(case_id))
self.metrics['sf_request_count'].inc()
update = self.sf.Case.update(
case_id,
{'Status': 'Auto-solved', 'Alert_ID__c': uuid.uuid4().hex}
)
logger.info('Closed case: {}.'.format(case_id))
return update
@sf_auth_retry
def _create_feed_item(self, subject, body, case_id):
feed_item = {'Title': subject, 'ParentId': case_id, 'Body': body}
return self.sf.FeedItem.create(feed_item)
@sf_auth_retry
def _get_case_by_alert_id(self, alert_id):
logger.info('Try to get case by alert ID: {}.'.format(alert_id))
if alert_id in self._registered_alerts:
return self._registered_alerts[alert_id]
try:
return self.sf.Case.get_by_custom_id('Alert_ID__c', alert_id)
except sf_exceptions.SalesforceResourceNotFound:
if self._registered_alerts.get(alert_id):
del self._registered_alerts[alert_id]
logger.warning('Alert ID: {} was already solved.'.format(alert_id))
def create_case(self, subject, body, labels):
alert_id = self._get_alert_id(labels)
error_code, case_id = self._create_case(subject, body,
labels, alert_id)
self._create_feed_item(subject, body, case_id)
response = {'case_id': case_id, 'alert_id': alert_id}
if error_code == 1:
response['status'] = 'duplicate'
else:
response['status'] = 'created'
return response
def close_case(self, labels):
alert_id = self._get_alert_id(labels)
case = self._get_case_by_alert_id(alert_id)
response = {'alert_id': alert_id, 'status': 'resolved'}
if case is None:
return response
if self._registered_alerts.get(alert_id):
del self._registered_alerts[alert_id]
response['case_id'] = case['Id']
response['closed'] = self._close_case(case['Id'])
return response