Merge "Salt Engine for graph data processing"
diff --git a/_engines/saltgraph.py b/_engines/saltgraph.py
new file mode 100644
index 0000000..0287449
--- /dev/null
+++ b/_engines/saltgraph.py
@@ -0,0 +1,217 @@
+# -*- coding: utf-8 -*-
+'''
+Saltgraph engine for catching returns of state runs, parsing them 
+and passing them to flat database of latest Salt resource runs.
+'''
+
+# Import python libs
+from __future__ import absolute_import
+import datetime
+import json
+import logging
+
+# Import salt libs
+import salt.utils.event
+
+# Import third party libs
+try:
+    import psycopg2
+    import psycopg2.extras
+    HAS_POSTGRES = True
+except ImportError:
+    HAS_POSTGRES = False
+
+__virtualname__ = 'saltgraph'
+log = logging.getLogger(__name__)
+
+
+def __virtual__():
+    if not HAS_POSTGRES:
+        return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.'
+    return __virtualname__
+
+
+def _get_conn(options={}):
+    '''
+    Return a postgres connection.
+    '''
+    host = options.get('host', '127.0.0.1')
+    user = options.get('user', 'salt')
+    passwd = options.get('passwd', 'salt')
+    datab = options.get('db', 'salt')
+    port = options.get('port', 5432)
+
+    return psycopg2.connect(
+            host=host,
+            user=user,
+            password=passwd,
+            database=datab,
+            port=port)
+
+
+def _close_conn(conn):
+    '''
+    Close the Postgres connection
+    '''
+    conn.commit()
+    conn.close()
+
+
+def _get_lowstate_data(options={}):
+    '''
+    TODO: document this method
+    '''
+    conn = _get_conn(options)
+    cur = conn.cursor()
+
+    try:
+        # you can only do this on Salt Masters minion
+        lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True})
+    except:
+        lowstate_req = {}
+
+    for minion, lowstate_ret in lowstate_req.items():
+        if lowstate_ret.get('retcode') != 0:
+            continue
+        for resource in lowstate_ret.get('ret', []):
+            low_sql = '''INSERT INTO salt_resources
+                         (id, resource_id, host, service, module, fun, status)
+                         VALUES (%s, %s, %s, %s, %s, %s, %s)
+                         ON CONFLICT (id) DO UPDATE
+                           SET resource_id = excluded.resource_id,
+                               host = excluded.host,
+                               service = excluded.service,
+                               module = excluded.module,
+                               fun = excluded.fun,
+                               alter_time = current_timestamp'''
+
+            rid = "%s|%s" % (minion, resource.get('__id__'))
+            cur.execute(
+                low_sql, (
+                    rid,
+                    resource.get('__id__'),
+                    minion,
+                    resource.get('__sls__'),
+                    resource.get('state') if 'state' in resource else resource.get('module'),
+                    resource.get('fun'),
+                    'unknown'
+                )
+            )
+            conn.commit()
+
+    if lowstate_req:
+        meta_sql = '''INSERT INTO salt_resources_meta
+                      (id, options)
+                      VALUES (%s, %s)
+                      ON CONFLICT (id) DO UPDATE
+                        SET options = excluded.options,
+                            alter_time = current_timestamp'''
+
+        cur.execute(
+            meta_sql, (
+                'lowstate_data',
+                '{}'
+            )
+        )
+    _close_conn(conn)
+
+
+def _up_to_date(options={}):
+    '''
+    TODO: document this method
+    '''
+    conn = _get_conn(options)
+    cur = conn.cursor()
+    #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+    ret = False
+
+    # if lowstate data are older than 1 day, refresh them
+    cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',))
+    alter_time = cur.fetchone()
+
+    if alter_time:
+        now = datetime.datetime.utcnow()
+        day = datetime.timedelta(days=1)
+        time_diff = now - alter_time[0].replace(tzinfo=None)
+        if time_diff < day:
+            ret = True
+    else:
+        skip = False
+
+    _close_conn(conn)
+
+    return ret
+
+
+def _update_resources(event, options):
+    '''
+    TODO: document this method
+    '''
+    conn = _get_conn(options)
+    cur = conn.cursor()
+
+    cur.execute('SELECT id FROM salt_resources')
+    resources_db = [res[0] for res in cur.fetchall()]
+    resources = event.get('return', {}).values()
+
+    for resource in resources:
+        rid = '%s|%s' % (event.get('id'), resource.get('__id__'))
+        if rid in resources_db:
+            status = 'unknown'
+            if resource.get('result', None) is not None:
+                status = 'success' if resource.get('result') else 'failed'
+
+            resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp)
+                                WHERE id = %s'''
+
+            cur.execute(
+                resource_sql, (
+                    status,
+                    repr(resource),
+                    rid
+                )
+            )
+
+            conn.commit()
+
+    _close_conn(conn)
+
+
+def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs):
+    '''
+    Listen to events and parse Salt state returns
+    '''
+    if __opts__['__role'] == 'master':
+        event_bus = salt.utils.event.get_master_event(
+                __opts__,
+                __opts__['sock_dir'],
+                listen=True)
+    else:
+        event_bus = salt.utils.event.get_event(
+            'minion',
+            transport=__opts__['transport'],
+            opts=__opts__,
+            sock_dir=__opts__['sock_dir'],
+            listen=True)
+        log.debug('Saltgraph engine started')
+
+    while True:
+        event = event_bus.get_event()
+        supported_funcs = ['state.sls', 'state.apply', 'state.highstate']
+        if event and event.get('fun', None) in supported_funcs:
+            test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])]
+            if not test:
+                options = {
+                    'host': host,
+                    'user': user,
+                    'passwd': password,
+                    'db': database,
+                    'port': port
+                }
+                is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')]
+                if is_reclass or not _up_to_date(options):
+                    _get_lowstate_data(options)
+
+                _update_resources(event, options)
+