Salt PostgreSQL returner for obtaining visualization data and associated Salt module
diff --git a/_modules/saltresource.py b/_modules/saltresource.py
new file mode 100644
index 0000000..f9d0987
--- /dev/null
+++ b/_modules/saltresource.py
@@ -0,0 +1,247 @@
+from __future__ import absolute_import
+# Let's not allow PyLint complain about string substitution
+# pylint: disable=W1321,E1321
+
+# Import python libs
+import logging
+
+# Import Salt libs
+import salt.returners
+
+# Import third party libs
+try:
+    import psycopg2
+    import psycopg2.extras
+    HAS_POSTGRES = True
+except ImportError:
+    HAS_POSTGRES = False
+
+LOG = logging.getLogger(__name__)
+
+
+def __virtual__():
+    if not HAS_POSTGRES:
+        return False, 'Could not import saltresource module; psycopg2 is not installed.'
+    return 'saltresource'
+
+
+def _get_options(ret=None):
+    '''
+    Get the postgres options from salt.
+    '''
+    attrs = {'host': 'host',
+             'user': 'user',
+             'passwd': 'passwd',
+             'db': 'db',
+             'port': 'port'}
+
+    _options = salt.returners.get_returner_options('returner.postgres_graph_db',
+                                                   ret,
+                                                   attrs,
+                                                   __salt__=__salt__,
+                                                   __opts__=__opts__)
+    return _options
+
+
+def _get_conn(ret=None):
+    '''
+    Return a postgres connection.
+    '''
+    _options = _get_options(ret)
+
+    host = _options.get('host')
+    user = _options.get('user')
+    passwd = _options.get('passwd')
+    datab = _options.get('db')
+    port = _options.get('port')
+
+    return psycopg2.connect(
+            host=host,
+            user=user,
+            password=passwd,
+            database=datab,
+            port=port)
+
+
+def _close_conn(conn):
+    '''
+    Close the Postgres connection
+    '''
+    conn.commit()
+    conn.close()
+
+
+def graph_data(*args, **kwargs):
+    '''
+    Returns graph data for visualization app
+
+    CLI Examples:
+
+    .. code-block:: bash
+
+        salt '*' saltresource.graph_data
+ 
+    '''
+    conn = _get_conn()
+    cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+    cur_dict.execute('SELECT host, service, status FROM salt_resources')
+    resources_db = [dict(res) for res in cur_dict]
+    db_dict = {}
+
+    for resource in resources_db:
+        host = resource.get('host')
+        service = '.'.join(resource.get('service').split('.')[:2])
+        status = resource.get('status')
+
+        if db_dict.get(host, None):
+            if db_dict[host].get(service, None):
+                service_data = db_dict[host][service]
+                service_data.append(status)
+            else:
+                db_dict[host][service] = [status]
+        else:
+            db_dict[host] = {service: []}
+
+    graph = []
+    for host, services in db_dict.items():
+        for service, statuses in services.items():
+            status = 'unknown'
+            if 'failed' in statuses:
+                status = 'failed'
+            elif 'success' in statuses and not ('failed' in statuses or 'unknown' in statuses):
+                status = 'success'
+            datum = {'host': host, 'service': service, 'status': status}
+            graph.append(datum)
+
+    _close_conn(conn)
+
+    return {'graph': graph}
+
+
+def host_data(host, **kwargs):
+    '''
+    Returns data describing single host
+
+    CLI Examples:
+
+    .. code-block:: bash
+
+        salt-call saltresource.host_data '<minion_id>'
+ 
+    '''
+    conn = _get_conn()
+    cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+    sql = 'SELECT host, service, resource_id, last_ret, status FROM salt_resources WHERE host=%s'
+    cur_dict.execute(sql, (host,))
+    resources_db = [dict(res) for res in cur_dict]
+    db_dict = {}
+
+    for resource in resources_db:
+        host = resource.get('host')
+        service = '.'.join(resource.get('service').split('.')[:2])
+        status = resource.get('status')
+
+        if db_dict.get(host, None):
+            if db_dict[host].get(service, None):
+                service_data = db_dict[host][service]
+                service_data.append(status)
+            else:
+                db_dict[host][service] = [status]
+        else:
+            db_dict[host] = {service: []}
+
+    graph = []
+
+    for host, services in db_dict.items():
+        for service, statuses in services.items():
+            status = 'unknown'
+            if 'failed' in statuses:
+                status = 'failed'
+            elif 'success' in statuses and not ('failed' in statuses or 'unknown' in statuses):
+                status = 'success'
+            resources = [{'service': r.get('service', ''), 'resource_id': r.get('resource_id', ''), 'last_ret': r.get('last_ret', None), 'status': r.get('status', '')}
+                         for r
+                         in resources_db
+                         if r.get('service', '').startswith(service)]
+            datum = {'host': host, 'service': service, 'status': status, 'resources': resources}
+            graph.append(datum)
+
+    _close_conn(conn)
+
+    return {'graph': graph}
+
+
+def sync_db(*args, **kwargs):
+    conn = _get_conn()
+    cur = conn.cursor()
+
+    resources_sql = '''
+      CREATE TABLE IF NOT EXISTS salt_resources (
+        id            varchar(255) NOT NULL UNIQUE,
+        resource_id   varchar(255) NOT NULL,
+        host          varchar(255) NOT NULL,
+        service       varchar(255) NOT NULL,
+        module        varchar(50) NOT NULL,
+        fun           varchar(50) NOT NULL,
+        status        varchar(50) NOT NULL,
+        options       json NULL,
+        last_ret      text NULL,
+        alter_time    TIMESTAMP WITH TIME ZONE DEFAULT now()
+      );
+    '''
+    cur.execute(resources_sql)
+    conn.commit()
+
+    resources_meta_sql = '''
+      CREATE TABLE IF NOT EXISTS salt_resources_meta (
+        id           varchar(255) NOT NULL UNIQUE,
+        options      json NULL,
+        alter_time   TIMESTAMP WITH TIME ZONE DEFAULT now()
+      );
+    '''
+    cur.execute(resources_meta_sql)
+    _close_conn(conn)
+
+    return True
+
+
+def flush_db(*args, **kwargs):
+    conn = _get_conn()
+    cur = conn.cursor()
+    result = True
+
+    resources_sql = 'DELETE FROM salt_resources'
+    try:
+        cur.execute(resources_sql)
+        conn.commit()
+    except Exception as e:
+        LOG.warning(repr(e))
+        result = False
+
+    resources_meta_sql = 'DELETE FROM salt_resources_meta'
+    try:
+        cur.execute(resources_meta_sql)
+        _close_conn(conn)
+    except Exception as e:
+        LOG.warning(repr(e))
+        result = False
+
+    return result
+
+
+def destroy_db(*args, **kwargs):
+    conn = _get_conn()
+    cur = conn.cursor()
+
+    resources_sql = 'DROP TABLE IF EXISTS salt_resources;'
+    cur.execute(resources_sql)
+    conn.commit()
+
+    resources_meta_sql = 'DROP TABLE IF EXISTS salt_resources_meta;'
+    cur.execute(resources_meta_sql)
+    _close_conn(conn)
+
+    return True
+
diff --git a/_returners/postgres_graph_db.py b/_returners/postgres_graph_db.py
new file mode 100644
index 0000000..989c020
--- /dev/null
+++ b/_returners/postgres_graph_db.py
@@ -0,0 +1,319 @@
+# -*- coding: utf-8 -*-
+'''
+Return data to a postgresql graph server
+
+.. note::
+    Creates database of all Salt resources which are to be run on
+    all minions and then updates their last known state during state
+    file runs. It can't function as master nor minion external cache.
+
+:maintainer:    None
+:maturity:      New
+:depends:       psycopg2
+:platform:      all
+
+To enable this returner the minion will need the psycopg2 installed and
+the following values configured in the minion or master config:
+
+.. code-block:: yaml
+
+    returner.postgres_graph_db.host: 'salt'
+    returner.postgres_graph_db.user: 'salt'
+    returner.postgres_graph_db.passwd: 'salt'
+    returner.postgres_graph_db.db: 'salt'
+    returner.postgres_graph_db.port: 5432
+
+Alternative configuration values can be used by prefacing the configuration.
+Any values not found in the alternative configuration will be pulled from
+the default location:
+
+.. code-block:: yaml
+
+    alternative.returner.postgres_graph_db.host: 'salt'
+    alternative.returner.postgres_graph_db.user: 'salt'
+    alternative.returner.postgres_graph_db.passwd: 'salt'
+    alternative.returner.postgres_graph_db.db: 'salt'
+    alternative.returner.postgres_graph_db.port: 5432
+
+Running the following commands as the postgres user should create the database
+correctly:
+
+.. code-block:: sql
+    psql << EOF
+    CREATE ROLE salt WITH LOGIN;
+    ALTER ROLE salt WITH PASSWORD 'salt';
+    CREATE DATABASE salt WITH OWNER salt;
+    EOF
+    
+    psql -h localhost -U salt << EOF
+    --
+    -- Table structure for table 'salt_resources'
+    --
+    
+    DROP TABLE IF EXISTS salt_resources;
+    CREATE TABLE salt_resources (
+      id            varchar(255) NOT NULL UNIQUE,
+      resource_id   varchar(255) NOT NULL,
+      host          varchar(255) NOT NULL,
+      service       varchar(255) NOT NULL,
+      module        varchar(50) NOT NULL,
+      fun           varchar(50) NOT NULL,
+      status        varchar(50) NOT NULL,
+      options       json NULL,
+      last_ret      text NULL,
+      alter_time    TIMESTAMP WITH TIME ZONE DEFAULT now()
+    );
+    
+    --
+    -- Table structure for table 'salt_resources_meta'
+    --
+    
+    DROP TABLE IF EXISTS salt_resources_meta;
+    CREATE TABLE salt_resources_meta (
+      id           varchar(255) NOT NULL UNIQUE,
+      options      json NULL,
+      alter_time   TIMESTAMP WITH TIME ZONE DEFAULT now()
+    );
+    EOF
+
+Required python modules: psycopg2
+
+To use the postgres_graph_db returner, append '--return postgres_graph_db' to the salt command.
+
+.. code-block:: bash
+
+    salt '*' test.ping --return postgres_graph_db
+
+To use the alternative configuration, append '--return_config alternative' to the salt command.
+
+.. versionadded:: 2015.5.0
+
+.. code-block:: bash
+
+    salt '*' test.ping --return postgres_graph_db --return_config alternative
+
+To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.
+
+.. versionadded:: 2016.3.0
+
+.. code-block:: bash
+
+    salt '*' test.ping --return postgres_graph_db --return_kwargs '{"db": "another-salt"}'
+
+'''
+from __future__ import absolute_import
+# Let's not allow PyLint complain about string substitution
+# pylint: disable=W1321,E1321
+
+# Import python libs
+import datetime
+import json
+import logging
+
+# Import Salt libs
+import salt.utils.jid
+import salt.returners
+
+# Import third party libs
+try:
+    import psycopg2
+    import psycopg2.extras
+    HAS_POSTGRES = True
+except ImportError:
+    HAS_POSTGRES = False
+
+__virtualname__ = 'postgres_graph_db'
+LOG = logging.getLogger(__name__)
+
+
+def __virtual__():
+    if not HAS_POSTGRES:
+        return False, 'Could not import postgres returner; psycopg2 is not installed.'
+    return __virtualname__
+
+
+def _get_options(ret=None):
+    '''
+    Get the postgres options from salt.
+    '''
+    attrs = {'host': 'host',
+             'user': 'user',
+             'passwd': 'passwd',
+             'db': 'db',
+             'port': 'port'}
+
+    _options = salt.returners.get_returner_options('returner.{0}'.format(__virtualname__),
+                                                   ret,
+                                                   attrs,
+                                                   __salt__=__salt__,
+                                                   __opts__=__opts__)
+    return _options
+
+
+def _get_conn(ret=None):
+    '''
+    Return a postgres connection.
+    '''
+    _options = _get_options(ret)
+
+    host = _options.get('host')
+    user = _options.get('user')
+    passwd = _options.get('passwd')
+    datab = _options.get('db')
+    port = _options.get('port')
+
+    return psycopg2.connect(
+            host=host,
+            user=user,
+            password=passwd,
+            database=datab,
+            port=port)
+
+
+def _close_conn(conn):
+    '''
+    Close the Postgres connection
+    '''
+    conn.commit()
+    conn.close()
+
+
+def _get_lowstate_data():
+    '''
+    TODO: document this method
+    '''
+    conn = _get_conn()
+    cur = conn.cursor()
+
+    try:
+        # you can only do this on Salt Masters minion
+        lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True})
+    except:
+        lowstate_req = {}
+    
+    for minion, lowstate_ret in lowstate_req.items():
+        if lowstate_ret.get('retcode') != 0:
+            continue
+        for resource in lowstate_ret.get('ret', []):
+            low_sql = '''INSERT INTO salt_resources
+                         (id, resource_id, host, service, module, fun, status)
+                         VALUES (%s, %s, %s, %s, %s, %s, %s)
+                         ON CONFLICT (id) DO UPDATE
+                           SET resource_id = excluded.resource_id,
+                               host = excluded.host,
+                               service = excluded.service,
+                               module = excluded.module,
+                               fun = excluded.fun,
+                               alter_time = current_timestamp'''
+
+            rid = "%s|%s" % (minion, resource.get('__id__'))
+
+            cur.execute(
+                low_sql, (
+                    rid,
+                    resource.get('__id__'),
+                    minion,
+                    resource.get('__sls__'),
+                    resource.get('state') if 'state' in resource else resource.get('module'),
+                    resource.get('fun'),
+                    'unknown'
+                )
+            )
+
+            conn.commit()
+
+    if lowstate_req:
+        meta_sql = '''INSERT INTO salt_resources_meta
+                      (id, options)
+                      VALUES (%s, %s)
+                      ON CONFLICT (id) DO UPDATE
+                        SET options = excluded.options,
+                            alter_time = current_timestamp'''
+
+        cur.execute(
+            meta_sql, (
+                'lowstate_data',
+                '{}'
+            )
+        )
+
+    _close_conn(conn)
+
+
+def _up_to_date():
+    '''
+    TODO: document this method
+    '''
+    conn = _get_conn()
+    cur = conn.cursor()
+    #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+    ret = False
+
+    # if lowstate data are older than 1 day, refresh them
+    cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',))
+    alter_time = cur.fetchone()
+
+    if alter_time:
+        now = datetime.datetime.utcnow()
+        day = datetime.timedelta(days=1)
+        time_diff = now - alter_time[0].replace(tzinfo=None)
+        if time_diff < day:
+            ret = True
+    else:
+        skip = False
+
+    _close_conn(conn)
+
+    return ret
+
+
+def _update_resources(ret):
+    '''
+    TODO: document this method
+    '''
+    conn = _get_conn(ret)
+    cur = conn.cursor()
+
+    cur.execute('SELECT id FROM salt_resources')
+    resources_db = [res[0] for res in cur.fetchall()]
+    resources = ret.get('return', {}).values()
+
+    for resource in resources:
+        rid = '%s|%s' % (ret.get('id'), resource.get('__id__'))
+        if rid in resources_db:
+            status = 'unknown'
+            if resource.get('result', None) is not None:
+                status = 'success' if resource.get('result') else 'failed'
+
+            resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp)
+                                WHERE id = %s'''
+
+            cur.execute(
+                resource_sql, (
+                    status,
+                    repr(resource),
+                    rid
+                )
+            )
+
+            conn.commit()
+
+    _close_conn(conn)
+
+
+def returner(ret):
+    '''
+    Return data to a postgres server
+    '''
+    #LOG.warning('RET: %s' % repr(ret))
+    supported_funcs = ['state.sls', 'state.apply', 'state.highstate']
+    test = 'test=true' in [arg.lower() for arg in ret.get('fun_args', [])]
+
+    if ret.get('fun') in supported_funcs and not test:
+        is_reclass = [arg for arg in ret.get('fun_args', []) if arg.startswith('reclass')]
+        if is_reclass or not _up_to_date():
+            _get_lowstate_data()
+
+        _update_resources(ret)
+