Reuse Salesforce session id
- Session file
- One worker at a time has ability to lock session file,
auth and write new session id
- Other workers waits and reuse the session id
- Workers compare their session id with the one in file
Change-Id: I371b0a7172a5b312a2ae2aef96d02505a1823f86
Related-bug: PROD-29347 (PROD:29347)
diff --git a/sf_notifier/helpers.py b/sf_notifier/helpers.py
index 295c566..e9c221b 100644
--- a/sf_notifier/helpers.py
+++ b/sf_notifier/helpers.py
@@ -13,10 +13,18 @@
# License for the specific language governing permissions and limitations
# under the License.
+import os
+
RESOLVED_STATUSES = ('UP', 'OK', 'resolved')
+def create_file(file_path):
+ if not os.path.exists(file_path):
+ with open(file_path, 'w+'):
+ return file_path
+
+
def _format_subject(alert):
subject = alert['annotations']['summary']
host = alert['labels'].get('host')
diff --git a/sf_notifier/salesforce/client.py b/sf_notifier/salesforce/client.py
index 51b1b43..95f324d 100644
--- a/sf_notifier/salesforce/client.py
+++ b/sf_notifier/salesforce/client.py
@@ -13,10 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+import fcntl
import hashlib
import logging
import os
+import time
import uuid
+from contextlib import contextmanager
from cachetools import TTLCache
@@ -42,7 +45,7 @@
}
CONFIG_FIELD_MAP = {
- 'auth_url': 'instance',
+ 'auth_url': 'instance_url',
'username': 'username',
'password': 'password',
'organization_id': 'organizationId',
@@ -51,20 +54,33 @@
}
ALLOWED_HASHING = ('md5', 'sha256')
+SESSION_FILE = '/tmp/session'
logger = logging.getLogger(__name__)
+@contextmanager
+def flocked(fd):
+ try:
+ fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ yield
+ except IOError:
+ logger.info('Waiting for session file 5 seconds...')
+ time.sleep(5)
+ finally:
+ fcntl.flock(fd, fcntl.LOCK_UN)
+
+
def sf_auth_retry(method):
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
except sf_exceptions.SalesforceExpiredSession:
logger.warning('Salesforce session expired.')
- self._auth()
+ self.auth()
except RequestsConnectionError:
logger.error('Salesforce connection error.')
- self._auth()
+ self.auth()
return method(self, *args, **kwargs)
return wrapper
@@ -81,12 +97,13 @@
'sf_error_count': Counter('sf_error_count', 'sf-notifier'),
'sf_request_count': Counter('sf_request_count', 'sf-notifier')
}
- self.session = Session()
self.config = self._validate_config(config)
self.hash_func = self._hash_func()
self.environment = self.config.pop('environment_id')
- self._auth()
self._registered_alerts = TTLCache(maxsize=2048, ttl=300)
+ self.sf = None
+ self.session = Session()
+ self.auth()
@staticmethod
def _hash_func():
@@ -119,17 +136,78 @@
raise SfNotifierError(msg)
return kwargs
- def _auth(self):
- kwargs = {'session': self.session}
- kwargs.update(self.config)
+ def _auth(self, config):
try:
- self.sf = Salesforce(**kwargs)
+ config.update({'session': self.session})
+ self.sf = Salesforce(**config)
except sf_exceptions.SalesforceAuthenticationFailed:
logger.error('Salesforce authentication failure.')
self.metrics['sf_auth_ok'].set(0)
- return
+ return False
+
logger.info('Salesforce authentication successful.')
self.metrics['sf_auth_ok'].set(1)
+ return True
+
+ def _load_session(self, session_file):
+ lines = session_file.readlines()
+
+ if lines == []:
+ return
+ return lines[0]
+
+ def _refresh_ready(self, saved_session):
+ if saved_session is None:
+ logger.info('Current session is None.')
+ return True
+
+ if self.sf is None:
+ return False
+
+ if self.sf.session_id == saved_session:
+ return True
+ return False
+
+ def _reuse_session(self, saved_session):
+ logger.info('Reusing session id from file.')
+ # limit params to avoid login request
+ config = {
+ 'session_id': saved_session,
+ 'instance_url': self.config['instance_url']
+ }
+ return self._auth(config)
+
+ def _acquire_session(self):
+ # only one worker at a time can check session_file
+ auth_success = False
+
+ with open(SESSION_FILE, 'r+') as session_file:
+ with flocked(session_file):
+ logger.info('Successfully locked session file for refresh.')
+
+ saved_session = self._load_session(session_file)
+
+ if self._refresh_ready(saved_session):
+ logger.info('Attepmting to refresh session.')
+
+ if self._auth(self.config):
+ auth_success = True
+ session_file.truncate(0)
+ session_file.seek(0)
+ session_file.write(self.sf.session_id)
+ logger.info('Refreshed session successfully.')
+ else:
+ logger.error('Failed to refresh session.')
+ else:
+ logger.info('Not refreshing. Reusing session.')
+ auth_success = self._reuse_session(saved_session)
+
+ return auth_success
+
+ def auth(self):
+ auth_ok = self._acquire_session()
+ while auth_ok is False:
+ auth_ok = self._acquire_session()
def _get_alert_id(self, labels):
alert_id_data = ''
diff --git a/sf_notifier/server.py b/sf_notifier/server.py
index e7d6787..4d3ddc0 100644
--- a/sf_notifier/server.py
+++ b/sf_notifier/server.py
@@ -22,8 +22,8 @@
from requests.exceptions import ConnectionError as RequestsConnectionError
-from sf_notifier.helpers import alert_fields_and_action
-from sf_notifier.salesforce.client import SalesforceClient
+from sf_notifier.helpers import alert_fields_and_action, create_file
+from sf_notifier.salesforce.client import SESSION_FILE, SalesforceClient
from simple_salesforce.exceptions import SalesforceError
@@ -39,6 +39,8 @@
'/metrics': make_wsgi_app()
})
+
+create_file(SESSION_FILE)
sf_cli = SalesforceClient(settings.SF_CONFIG)
diff --git a/sf_notifier/tests/test_client.py b/sf_notifier/tests/test_client.py
index c6a2a2e..059ea24 100644
--- a/sf_notifier/tests/test_client.py
+++ b/sf_notifier/tests/test_client.py
@@ -32,7 +32,7 @@
'SANDBOX_ENABLED': True
},
{
- 'instance': 'instance_xxx',
+ 'instance_url': 'instance_xxx',
'username': 'username_xxx',
'password': 'password_xxx',
'organizationId': 'org_xxx',
@@ -49,7 +49,7 @@
'ENVIRONMENT_ID': 'env_xxx'
},
{
- 'instance': 'instance_xxx',
+ 'instance_url': 'instance_xxx',
'username': 'username_xxx',
'password': 'password_xxx',
'organizationId': 'org_xxx',