Reuse Salesforce session id

- Session file
- One worker at a time has ability to lock session file,
  auth and write new session id
- Other workers waits and reuse the session id
- Workers compare their session id with the one in file

Change-Id: I371b0a7172a5b312a2ae2aef96d02505a1823f86
Related-bug: PROD-29347 (PROD:29347)
diff --git a/sf_notifier/helpers.py b/sf_notifier/helpers.py
index 295c566..e9c221b 100644
--- a/sf_notifier/helpers.py
+++ b/sf_notifier/helpers.py
@@ -13,10 +13,18 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import os
+
 
 RESOLVED_STATUSES = ('UP', 'OK', 'resolved')
 
 
+def create_file(file_path):
+    if not os.path.exists(file_path):
+        with open(file_path, 'w+'):
+            return file_path
+
+
 def _format_subject(alert):
     subject = alert['annotations']['summary']
     host = alert['labels'].get('host')
diff --git a/sf_notifier/salesforce/client.py b/sf_notifier/salesforce/client.py
index 51b1b43..95f324d 100644
--- a/sf_notifier/salesforce/client.py
+++ b/sf_notifier/salesforce/client.py
@@ -13,10 +13,13 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import fcntl
 import hashlib
 import logging
 import os
+import time
 import uuid
+from contextlib import contextmanager
 
 from cachetools import TTLCache
 
@@ -42,7 +45,7 @@
 }
 
 CONFIG_FIELD_MAP = {
-    'auth_url': 'instance',
+    'auth_url': 'instance_url',
     'username': 'username',
     'password': 'password',
     'organization_id': 'organizationId',
@@ -51,20 +54,33 @@
 }
 
 ALLOWED_HASHING = ('md5', 'sha256')
+SESSION_FILE = '/tmp/session'
 
 logger = logging.getLogger(__name__)
 
 
+@contextmanager
+def flocked(fd):
+    try:
+        fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        yield
+    except IOError:
+        logger.info('Waiting for session file 5 seconds...')
+        time.sleep(5)
+    finally:
+        fcntl.flock(fd, fcntl.LOCK_UN)
+
+
 def sf_auth_retry(method):
     def wrapper(self, *args, **kwargs):
         try:
             return method(self, *args, **kwargs)
         except sf_exceptions.SalesforceExpiredSession:
             logger.warning('Salesforce session expired.')
-            self._auth()
+            self.auth()
         except RequestsConnectionError:
             logger.error('Salesforce connection error.')
-            self._auth()
+            self.auth()
         return method(self, *args, **kwargs)
     return wrapper
 
@@ -81,12 +97,13 @@
             'sf_error_count': Counter('sf_error_count', 'sf-notifier'),
             'sf_request_count': Counter('sf_request_count', 'sf-notifier')
         }
-        self.session = Session()
         self.config = self._validate_config(config)
         self.hash_func = self._hash_func()
         self.environment = self.config.pop('environment_id')
-        self._auth()
         self._registered_alerts = TTLCache(maxsize=2048, ttl=300)
+        self.sf = None
+        self.session = Session()
+        self.auth()
 
     @staticmethod
     def _hash_func():
@@ -119,17 +136,78 @@
                 raise SfNotifierError(msg)
         return kwargs
 
-    def _auth(self):
-        kwargs = {'session': self.session}
-        kwargs.update(self.config)
+    def _auth(self, config):
         try:
-            self.sf = Salesforce(**kwargs)
+            config.update({'session': self.session})
+            self.sf = Salesforce(**config)
         except sf_exceptions.SalesforceAuthenticationFailed:
             logger.error('Salesforce authentication failure.')
             self.metrics['sf_auth_ok'].set(0)
-            return
+            return False
+
         logger.info('Salesforce authentication successful.')
         self.metrics['sf_auth_ok'].set(1)
+        return True
+
+    def _load_session(self, session_file):
+        lines = session_file.readlines()
+
+        if lines == []:
+            return
+        return lines[0]
+
+    def _refresh_ready(self, saved_session):
+        if saved_session is None:
+            logger.info('Current session is None.')
+            return True
+
+        if self.sf is None:
+            return False
+
+        if self.sf.session_id == saved_session:
+            return True
+        return False
+
+    def _reuse_session(self, saved_session):
+        logger.info('Reusing session id from file.')
+        # limit params to avoid login request
+        config = {
+            'session_id': saved_session,
+            'instance_url': self.config['instance_url']
+        }
+        return self._auth(config)
+
+    def _acquire_session(self):
+        # only one worker at a time can check session_file
+        auth_success = False
+
+        with open(SESSION_FILE, 'r+') as session_file:
+            with flocked(session_file):
+                logger.info('Successfully locked session file for refresh.')
+
+                saved_session = self._load_session(session_file)
+
+                if self._refresh_ready(saved_session):
+                    logger.info('Attepmting to refresh session.')
+
+                    if self._auth(self.config):
+                        auth_success = True
+                        session_file.truncate(0)
+                        session_file.seek(0)
+                        session_file.write(self.sf.session_id)
+                        logger.info('Refreshed session successfully.')
+                    else:
+                        logger.error('Failed to refresh session.')
+                else:
+                    logger.info('Not refreshing. Reusing session.')
+                    auth_success = self._reuse_session(saved_session)
+
+        return auth_success
+
+    def auth(self):
+        auth_ok = self._acquire_session()
+        while auth_ok is False:
+            auth_ok = self._acquire_session()
 
     def _get_alert_id(self, labels):
         alert_id_data = ''
diff --git a/sf_notifier/server.py b/sf_notifier/server.py
index e7d6787..4d3ddc0 100644
--- a/sf_notifier/server.py
+++ b/sf_notifier/server.py
@@ -22,8 +22,8 @@
 
 from requests.exceptions import ConnectionError as RequestsConnectionError
 
-from sf_notifier.helpers import alert_fields_and_action
-from sf_notifier.salesforce.client import SalesforceClient
+from sf_notifier.helpers import alert_fields_and_action, create_file
+from sf_notifier.salesforce.client import SESSION_FILE, SalesforceClient
 
 from simple_salesforce.exceptions import SalesforceError
 
@@ -39,6 +39,8 @@
     '/metrics': make_wsgi_app()
 })
 
+
+create_file(SESSION_FILE)
 sf_cli = SalesforceClient(settings.SF_CONFIG)
 
 
diff --git a/sf_notifier/tests/test_client.py b/sf_notifier/tests/test_client.py
index c6a2a2e..059ea24 100644
--- a/sf_notifier/tests/test_client.py
+++ b/sf_notifier/tests/test_client.py
@@ -32,7 +32,7 @@
             'SANDBOX_ENABLED': True
         },
         {
-            'instance': 'instance_xxx',
+            'instance_url': 'instance_xxx',
             'username': 'username_xxx',
             'password': 'password_xxx',
             'organizationId': 'org_xxx',
@@ -49,7 +49,7 @@
             'ENVIRONMENT_ID': 'env_xxx'
         },
         {
-            'instance': 'instance_xxx',
+            'instance_url': 'instance_xxx',
             'username': 'username_xxx',
             'password': 'password_xxx',
             'organizationId': 'org_xxx',