Salt PostgreSQL returner for obtaining visualization data and associated Salt module
diff --git a/_modules/saltresource.py b/_modules/saltresource.py
new file mode 100644
index 0000000..f9d0987
--- /dev/null
+++ b/_modules/saltresource.py
@@ -0,0 +1,247 @@
+from __future__ import absolute_import
+# Let's not allow PyLint complain about string substitution
+# pylint: disable=W1321,E1321
+
+# Import python libs
+import logging
+
+# Import Salt libs
+import salt.returners
+
+# Import third party libs
+try:
+ import psycopg2
+ import psycopg2.extras
+ HAS_POSTGRES = True
+except ImportError:
+ HAS_POSTGRES = False
+
+LOG = logging.getLogger(__name__)
+
+
+def __virtual__():
+ if not HAS_POSTGRES:
+ return False, 'Could not import saltresource module; psycopg2 is not installed.'
+ return 'saltresource'
+
+
+def _get_options(ret=None):
+ '''
+ Get the postgres options from salt.
+ '''
+ attrs = {'host': 'host',
+ 'user': 'user',
+ 'passwd': 'passwd',
+ 'db': 'db',
+ 'port': 'port'}
+
+ _options = salt.returners.get_returner_options('returner.postgres_graph_db',
+ ret,
+ attrs,
+ __salt__=__salt__,
+ __opts__=__opts__)
+ return _options
+
+
+def _get_conn(ret=None):
+ '''
+ Return a postgres connection.
+ '''
+ _options = _get_options(ret)
+
+ host = _options.get('host')
+ user = _options.get('user')
+ passwd = _options.get('passwd')
+ datab = _options.get('db')
+ port = _options.get('port')
+
+ return psycopg2.connect(
+ host=host,
+ user=user,
+ password=passwd,
+ database=datab,
+ port=port)
+
+
+def _close_conn(conn):
+ '''
+ Close the Postgres connection
+ '''
+ conn.commit()
+ conn.close()
+
+
+def graph_data(*args, **kwargs):
+ '''
+ Returns graph data for visualization app
+
+ CLI Examples:
+
+ .. code-block:: bash
+
+ salt '*' saltresource.graph_data
+
+ '''
+ conn = _get_conn()
+ cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ cur_dict.execute('SELECT host, service, status FROM salt_resources')
+ resources_db = [dict(res) for res in cur_dict]
+ db_dict = {}
+
+ for resource in resources_db:
+ host = resource.get('host')
+ service = '.'.join(resource.get('service').split('.')[:2])
+ status = resource.get('status')
+
+ if db_dict.get(host, None):
+ if db_dict[host].get(service, None):
+ service_data = db_dict[host][service]
+ service_data.append(status)
+ else:
+ db_dict[host][service] = [status]
+ else:
+ db_dict[host] = {service: []}
+
+ graph = []
+ for host, services in db_dict.items():
+ for service, statuses in services.items():
+ status = 'unknown'
+ if 'failed' in statuses:
+ status = 'failed'
+ elif 'success' in statuses and not ('failed' in statuses or 'unknown' in statuses):
+ status = 'success'
+ datum = {'host': host, 'service': service, 'status': status}
+ graph.append(datum)
+
+ _close_conn(conn)
+
+ return {'graph': graph}
+
+
+def host_data(host, **kwargs):
+ '''
+ Returns data describing single host
+
+ CLI Examples:
+
+ .. code-block:: bash
+
+ salt-call saltresource.host_data '<minion_id>'
+
+ '''
+ conn = _get_conn()
+ cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ sql = 'SELECT host, service, resource_id, last_ret, status FROM salt_resources WHERE host=%s'
+ cur_dict.execute(sql, (host,))
+ resources_db = [dict(res) for res in cur_dict]
+ db_dict = {}
+
+ for resource in resources_db:
+ host = resource.get('host')
+ service = '.'.join(resource.get('service').split('.')[:2])
+ status = resource.get('status')
+
+ if db_dict.get(host, None):
+ if db_dict[host].get(service, None):
+ service_data = db_dict[host][service]
+ service_data.append(status)
+ else:
+ db_dict[host][service] = [status]
+ else:
+ db_dict[host] = {service: []}
+
+ graph = []
+
+ for host, services in db_dict.items():
+ for service, statuses in services.items():
+ status = 'unknown'
+ if 'failed' in statuses:
+ status = 'failed'
+ elif 'success' in statuses and not ('failed' in statuses or 'unknown' in statuses):
+ status = 'success'
+ resources = [{'service': r.get('service', ''), 'resource_id': r.get('resource_id', ''), 'last_ret': r.get('last_ret', None), 'status': r.get('status', '')}
+ for r
+ in resources_db
+ if r.get('service', '').startswith(service)]
+ datum = {'host': host, 'service': service, 'status': status, 'resources': resources}
+ graph.append(datum)
+
+ _close_conn(conn)
+
+ return {'graph': graph}
+
+
+def sync_db(*args, **kwargs):
+ conn = _get_conn()
+ cur = conn.cursor()
+
+ resources_sql = '''
+ CREATE TABLE IF NOT EXISTS salt_resources (
+ id varchar(255) NOT NULL UNIQUE,
+ resource_id varchar(255) NOT NULL,
+ host varchar(255) NOT NULL,
+ service varchar(255) NOT NULL,
+ module varchar(50) NOT NULL,
+ fun varchar(50) NOT NULL,
+ status varchar(50) NOT NULL,
+ options json NULL,
+ last_ret text NULL,
+ alter_time TIMESTAMP WITH TIME ZONE DEFAULT now()
+ );
+ '''
+ cur.execute(resources_sql)
+ conn.commit()
+
+ resources_meta_sql = '''
+ CREATE TABLE IF NOT EXISTS salt_resources_meta (
+ id varchar(255) NOT NULL UNIQUE,
+ options json NULL,
+ alter_time TIMESTAMP WITH TIME ZONE DEFAULT now()
+ );
+ '''
+ cur.execute(resources_meta_sql)
+ _close_conn(conn)
+
+ return True
+
+
+def flush_db(*args, **kwargs):
+ conn = _get_conn()
+ cur = conn.cursor()
+ result = True
+
+ resources_sql = 'DELETE FROM salt_resources'
+ try:
+ cur.execute(resources_sql)
+ conn.commit()
+ except Exception as e:
+ LOG.warning(repr(e))
+ result = False
+
+ resources_meta_sql = 'DELETE FROM salt_resources_meta'
+ try:
+ cur.execute(resources_meta_sql)
+ _close_conn(conn)
+ except Exception as e:
+ LOG.warning(repr(e))
+ result = False
+
+ return result
+
+
+def destroy_db(*args, **kwargs):
+ conn = _get_conn()
+ cur = conn.cursor()
+
+ resources_sql = 'DROP TABLE IF EXISTS salt_resources;'
+ cur.execute(resources_sql)
+ conn.commit()
+
+ resources_meta_sql = 'DROP TABLE IF EXISTS salt_resources_meta;'
+ cur.execute(resources_meta_sql)
+ _close_conn(conn)
+
+ return True
+
diff --git a/_returners/postgres_graph_db.py b/_returners/postgres_graph_db.py
new file mode 100644
index 0000000..989c020
--- /dev/null
+++ b/_returners/postgres_graph_db.py
@@ -0,0 +1,319 @@
+# -*- coding: utf-8 -*-
+'''
+Return data to a postgresql graph server
+
+.. note::
+ Creates database of all Salt resources which are to be run on
+ all minions and then updates their last known state during state
+ file runs. It can't function as master nor minion external cache.
+
+:maintainer: None
+:maturity: New
+:depends: psycopg2
+:platform: all
+
+To enable this returner the minion will need the psycopg2 installed and
+the following values configured in the minion or master config:
+
+.. code-block:: yaml
+
+ returner.postgres_graph_db.host: 'salt'
+ returner.postgres_graph_db.user: 'salt'
+ returner.postgres_graph_db.passwd: 'salt'
+ returner.postgres_graph_db.db: 'salt'
+ returner.postgres_graph_db.port: 5432
+
+Alternative configuration values can be used by prefacing the configuration.
+Any values not found in the alternative configuration will be pulled from
+the default location:
+
+.. code-block:: yaml
+
+ alternative.returner.postgres_graph_db.host: 'salt'
+ alternative.returner.postgres_graph_db.user: 'salt'
+ alternative.returner.postgres_graph_db.passwd: 'salt'
+ alternative.returner.postgres_graph_db.db: 'salt'
+ alternative.returner.postgres_graph_db.port: 5432
+
+Running the following commands as the postgres user should create the database
+correctly:
+
+.. code-block:: sql
+ psql << EOF
+ CREATE ROLE salt WITH LOGIN;
+ ALTER ROLE salt WITH PASSWORD 'salt';
+ CREATE DATABASE salt WITH OWNER salt;
+ EOF
+
+ psql -h localhost -U salt << EOF
+ --
+ -- Table structure for table 'salt_resources'
+ --
+
+ DROP TABLE IF EXISTS salt_resources;
+ CREATE TABLE salt_resources (
+ id varchar(255) NOT NULL UNIQUE,
+ resource_id varchar(255) NOT NULL,
+ host varchar(255) NOT NULL,
+ service varchar(255) NOT NULL,
+ module varchar(50) NOT NULL,
+ fun varchar(50) NOT NULL,
+ status varchar(50) NOT NULL,
+ options json NULL,
+ last_ret text NULL,
+ alter_time TIMESTAMP WITH TIME ZONE DEFAULT now()
+ );
+
+ --
+ -- Table structure for table 'salt_resources_meta'
+ --
+
+ DROP TABLE IF EXISTS salt_resources_meta;
+ CREATE TABLE salt_resources_meta (
+ id varchar(255) NOT NULL UNIQUE,
+ options json NULL,
+ alter_time TIMESTAMP WITH TIME ZONE DEFAULT now()
+ );
+ EOF
+
+Required python modules: psycopg2
+
+To use the postgres_graph_db returner, append '--return postgres_graph_db' to the salt command.
+
+.. code-block:: bash
+
+ salt '*' test.ping --return postgres_graph_db
+
+To use the alternative configuration, append '--return_config alternative' to the salt command.
+
+.. versionadded:: 2015.5.0
+
+.. code-block:: bash
+
+ salt '*' test.ping --return postgres_graph_db --return_config alternative
+
+To override individual configuration items, append --return_kwargs '{"key:": "value"}' to the salt command.
+
+.. versionadded:: 2016.3.0
+
+.. code-block:: bash
+
+ salt '*' test.ping --return postgres_graph_db --return_kwargs '{"db": "another-salt"}'
+
+'''
+from __future__ import absolute_import
+# Let's not allow PyLint complain about string substitution
+# pylint: disable=W1321,E1321
+
+# Import python libs
+import datetime
+import json
+import logging
+
+# Import Salt libs
+import salt.utils.jid
+import salt.returners
+
+# Import third party libs
+try:
+ import psycopg2
+ import psycopg2.extras
+ HAS_POSTGRES = True
+except ImportError:
+ HAS_POSTGRES = False
+
+__virtualname__ = 'postgres_graph_db'
+LOG = logging.getLogger(__name__)
+
+
+def __virtual__():
+ if not HAS_POSTGRES:
+ return False, 'Could not import postgres returner; psycopg2 is not installed.'
+ return __virtualname__
+
+
+def _get_options(ret=None):
+ '''
+ Get the postgres options from salt.
+ '''
+ attrs = {'host': 'host',
+ 'user': 'user',
+ 'passwd': 'passwd',
+ 'db': 'db',
+ 'port': 'port'}
+
+ _options = salt.returners.get_returner_options('returner.{0}'.format(__virtualname__),
+ ret,
+ attrs,
+ __salt__=__salt__,
+ __opts__=__opts__)
+ return _options
+
+
+def _get_conn(ret=None):
+ '''
+ Return a postgres connection.
+ '''
+ _options = _get_options(ret)
+
+ host = _options.get('host')
+ user = _options.get('user')
+ passwd = _options.get('passwd')
+ datab = _options.get('db')
+ port = _options.get('port')
+
+ return psycopg2.connect(
+ host=host,
+ user=user,
+ password=passwd,
+ database=datab,
+ port=port)
+
+
+def _close_conn(conn):
+ '''
+ Close the Postgres connection
+ '''
+ conn.commit()
+ conn.close()
+
+
+def _get_lowstate_data():
+ '''
+ TODO: document this method
+ '''
+ conn = _get_conn()
+ cur = conn.cursor()
+
+ try:
+ # you can only do this on Salt Masters minion
+ lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True})
+ except:
+ lowstate_req = {}
+
+ for minion, lowstate_ret in lowstate_req.items():
+ if lowstate_ret.get('retcode') != 0:
+ continue
+ for resource in lowstate_ret.get('ret', []):
+ low_sql = '''INSERT INTO salt_resources
+ (id, resource_id, host, service, module, fun, status)
+ VALUES (%s, %s, %s, %s, %s, %s, %s)
+ ON CONFLICT (id) DO UPDATE
+ SET resource_id = excluded.resource_id,
+ host = excluded.host,
+ service = excluded.service,
+ module = excluded.module,
+ fun = excluded.fun,
+ alter_time = current_timestamp'''
+
+ rid = "%s|%s" % (minion, resource.get('__id__'))
+
+ cur.execute(
+ low_sql, (
+ rid,
+ resource.get('__id__'),
+ minion,
+ resource.get('__sls__'),
+ resource.get('state') if 'state' in resource else resource.get('module'),
+ resource.get('fun'),
+ 'unknown'
+ )
+ )
+
+ conn.commit()
+
+ if lowstate_req:
+ meta_sql = '''INSERT INTO salt_resources_meta
+ (id, options)
+ VALUES (%s, %s)
+ ON CONFLICT (id) DO UPDATE
+ SET options = excluded.options,
+ alter_time = current_timestamp'''
+
+ cur.execute(
+ meta_sql, (
+ 'lowstate_data',
+ '{}'
+ )
+ )
+
+ _close_conn(conn)
+
+
+def _up_to_date():
+ '''
+ TODO: document this method
+ '''
+ conn = _get_conn()
+ cur = conn.cursor()
+ #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ ret = False
+
+ # if lowstate data are older than 1 day, refresh them
+ cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',))
+ alter_time = cur.fetchone()
+
+ if alter_time:
+ now = datetime.datetime.utcnow()
+ day = datetime.timedelta(days=1)
+ time_diff = now - alter_time[0].replace(tzinfo=None)
+ if time_diff < day:
+ ret = True
+ else:
+ skip = False
+
+ _close_conn(conn)
+
+ return ret
+
+
+def _update_resources(ret):
+ '''
+ TODO: document this method
+ '''
+ conn = _get_conn(ret)
+ cur = conn.cursor()
+
+ cur.execute('SELECT id FROM salt_resources')
+ resources_db = [res[0] for res in cur.fetchall()]
+ resources = ret.get('return', {}).values()
+
+ for resource in resources:
+ rid = '%s|%s' % (ret.get('id'), resource.get('__id__'))
+ if rid in resources_db:
+ status = 'unknown'
+ if resource.get('result', None) is not None:
+ status = 'success' if resource.get('result') else 'failed'
+
+ resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp)
+ WHERE id = %s'''
+
+ cur.execute(
+ resource_sql, (
+ status,
+ repr(resource),
+ rid
+ )
+ )
+
+ conn.commit()
+
+ _close_conn(conn)
+
+
+def returner(ret):
+ '''
+ Return data to a postgres server
+ '''
+ #LOG.warning('RET: %s' % repr(ret))
+ supported_funcs = ['state.sls', 'state.apply', 'state.highstate']
+ test = 'test=true' in [arg.lower() for arg in ret.get('fun_args', [])]
+
+ if ret.get('fun') in supported_funcs and not test:
+ is_reclass = [arg for arg in ret.get('fun_args', []) if arg.startswith('reclass')]
+ if is_reclass or not _up_to_date():
+ _get_lowstate_data()
+
+ _update_resources(ret)
+