| import json |
| from logging.config import dictConfig |
| import time |
| |
| from flask import Flask, Response, jsonify, request |
| from prometheus_client import make_wsgi_app |
| from requests.exceptions import ConnectionError as RequestsConnectionError |
| |
| from simple_salesforce.exceptions import SalesforceError |
| from simple_settings import settings |
| from werkzeug.wsgi import DispatcherMiddleware |
| from uwsgidecorators import mulefunc |
| |
| from sf_notifier.helpers import alert_fields_and_action, create_file |
| from sf_notifier.salesforce.client import SESSION_FILE, SalesforceClient |
| from sf_notifier.salesforce.settings import CASE_STATUS |
| |
| |
| dictConfig(settings.LOGGING) |
| |
| app = Flask(__name__) |
| app_dispatch = DispatcherMiddleware(app, { |
| '/metrics': make_wsgi_app() |
| }) |
| |
| |
| create_file(SESSION_FILE) |
| sf_cli = SalesforceClient(settings.SF_CONFIG) |
| |
| |
| @app.route('/info', methods=['GET']) |
| def info(): |
| return jsonify({ |
| 'version': settings.VERSION, |
| 'hashing': sf_cli.hash_func.__name__ |
| }) |
| |
| |
| @mulefunc |
| def offload(action, fields): |
| try: |
| getattr(sf_cli, action)(*fields) |
| except (SalesforceError, RequestsConnectionError) as err: |
| msg = 'Salesforce request failure: {}.'.format(err) |
| sf_cli.metrics['sf_error_count'].inc() |
| app.logger.error(msg) |
| |
| |
| def create_case_results(alert_ids): |
| cases = {} |
| is_error = False |
| |
| while len(alert_ids) > len(cases): |
| for alert_id in alert_ids: |
| case = sf_cli._registered_alerts.get(alert_id) |
| |
| if case is None: |
| continue |
| |
| if case['id'] not in CASE_STATUS: |
| cases[alert_id] = {} |
| cases[alert_id]['case'] = case['id'] |
| if case['id'] == 'error': |
| if sf_cli._feed_update_ready(case['last_update']): |
| continue |
| is_error = True |
| cases[alert_id] = {} |
| cases[alert_id]['error'] = case['error'] |
| |
| time.sleep(0.2) |
| return cases, is_error |
| |
| |
| def close_case_results(alert_ids): |
| # timeout is implicit error |
| cases = {} |
| |
| while len(alert_ids) > 0: |
| for alert_id in alert_ids: |
| case = sf_cli._registered_alerts.get(alert_id) |
| |
| if case is None: |
| cases[alert_id] = {'status': 'closed'} |
| alert_ids.pop(alert_ids.index(alert_id)) |
| time.sleep(0.2) |
| return cases, False |
| |
| |
| @app.route('/hook', methods=['POST']) |
| def webhook_receiver(): |
| |
| try: |
| data = json.loads(request.data) |
| except ValueError: |
| msg = 'Invalid request data: {}.'.format(request.data) |
| app.logger.error(msg) |
| return Response(json.dumps({'error': msg}), |
| status=400, |
| mimetype='application/json') |
| |
| app.logger.info('Received requests: {}'.format(data)) |
| |
| alert_ids = [] |
| for alert in data['alerts']: |
| alert['labels']['env_id'] = sf_cli.environment |
| alert_ids.append(sf_cli.get_alert_id(alert['labels'])) |
| fields, action = alert_fields_and_action(alert) |
| |
| offload(action, fields) |
| |
| cases, is_error = globals()[action + '_results'](alert_ids) |
| |
| if is_error: |
| return Response(json.dumps(cases), |
| status=500, |
| mimetype='application/json') |
| |
| return jsonify(cases) |
| |
| |
| if __name__ == '__main__': |
| app.run() |