Adam Tengler | 3e96b11 | 2017-09-05 15:51:06 +0000 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | ''' |
| 3 | Saltgraph engine for catching returns of state runs, parsing them |
| 4 | and passing them to flat database of latest Salt resource runs. |
| 5 | ''' |
| 6 | |
| 7 | # Import python libs |
| 8 | from __future__ import absolute_import |
| 9 | import datetime |
| 10 | import json |
| 11 | import logging |
| 12 | |
| 13 | # Import salt libs |
| 14 | import salt.utils.event |
| 15 | |
| 16 | # Import third party libs |
| 17 | try: |
| 18 | import psycopg2 |
| 19 | import psycopg2.extras |
| 20 | HAS_POSTGRES = True |
| 21 | except ImportError: |
| 22 | HAS_POSTGRES = False |
| 23 | |
| 24 | __virtualname__ = 'saltgraph' |
| 25 | log = logging.getLogger(__name__) |
| 26 | |
| 27 | |
| 28 | def __virtual__(): |
| 29 | if not HAS_POSTGRES: |
| 30 | return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.' |
| 31 | return __virtualname__ |
| 32 | |
| 33 | |
| 34 | def _get_conn(options={}): |
| 35 | ''' |
| 36 | Return a postgres connection. |
| 37 | ''' |
| 38 | host = options.get('host', '127.0.0.1') |
| 39 | user = options.get('user', 'salt') |
| 40 | passwd = options.get('passwd', 'salt') |
| 41 | datab = options.get('db', 'salt') |
| 42 | port = options.get('port', 5432) |
| 43 | |
| 44 | return psycopg2.connect( |
| 45 | host=host, |
| 46 | user=user, |
| 47 | password=passwd, |
| 48 | database=datab, |
| 49 | port=port) |
| 50 | |
| 51 | |
| 52 | def _close_conn(conn): |
| 53 | ''' |
| 54 | Close the Postgres connection |
| 55 | ''' |
| 56 | conn.commit() |
| 57 | conn.close() |
| 58 | |
| 59 | |
| 60 | def _get_lowstate_data(options={}): |
| 61 | ''' |
| 62 | TODO: document this method |
| 63 | ''' |
| 64 | conn = _get_conn(options) |
| 65 | cur = conn.cursor() |
| 66 | |
| 67 | try: |
| 68 | # you can only do this on Salt Masters minion |
| 69 | lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True}) |
| 70 | except: |
| 71 | lowstate_req = {} |
| 72 | |
| 73 | for minion, lowstate_ret in lowstate_req.items(): |
| 74 | if lowstate_ret.get('retcode') != 0: |
| 75 | continue |
| 76 | for resource in lowstate_ret.get('ret', []): |
| 77 | low_sql = '''INSERT INTO salt_resources |
| 78 | (id, resource_id, host, service, module, fun, status) |
| 79 | VALUES (%s, %s, %s, %s, %s, %s, %s) |
| 80 | ON CONFLICT (id) DO UPDATE |
| 81 | SET resource_id = excluded.resource_id, |
| 82 | host = excluded.host, |
| 83 | service = excluded.service, |
| 84 | module = excluded.module, |
| 85 | fun = excluded.fun, |
| 86 | alter_time = current_timestamp''' |
| 87 | |
| 88 | rid = "%s|%s" % (minion, resource.get('__id__')) |
| 89 | cur.execute( |
| 90 | low_sql, ( |
| 91 | rid, |
| 92 | resource.get('__id__'), |
| 93 | minion, |
| 94 | resource.get('__sls__'), |
| 95 | resource.get('state') if 'state' in resource else resource.get('module'), |
| 96 | resource.get('fun'), |
| 97 | 'unknown' |
| 98 | ) |
| 99 | ) |
| 100 | conn.commit() |
| 101 | |
| 102 | if lowstate_req: |
| 103 | meta_sql = '''INSERT INTO salt_resources_meta |
| 104 | (id, options) |
| 105 | VALUES (%s, %s) |
| 106 | ON CONFLICT (id) DO UPDATE |
| 107 | SET options = excluded.options, |
| 108 | alter_time = current_timestamp''' |
| 109 | |
| 110 | cur.execute( |
| 111 | meta_sql, ( |
| 112 | 'lowstate_data', |
| 113 | '{}' |
| 114 | ) |
| 115 | ) |
| 116 | _close_conn(conn) |
| 117 | |
| 118 | |
| 119 | def _up_to_date(options={}): |
| 120 | ''' |
| 121 | TODO: document this method |
| 122 | ''' |
| 123 | conn = _get_conn(options) |
| 124 | cur = conn.cursor() |
| 125 | #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) |
| 126 | |
| 127 | ret = False |
| 128 | |
| 129 | # if lowstate data are older than 1 day, refresh them |
| 130 | cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',)) |
| 131 | alter_time = cur.fetchone() |
| 132 | |
| 133 | if alter_time: |
| 134 | now = datetime.datetime.utcnow() |
| 135 | day = datetime.timedelta(days=1) |
| 136 | time_diff = now - alter_time[0].replace(tzinfo=None) |
| 137 | if time_diff < day: |
| 138 | ret = True |
| 139 | else: |
| 140 | skip = False |
| 141 | |
| 142 | _close_conn(conn) |
| 143 | |
| 144 | return ret |
| 145 | |
| 146 | |
| 147 | def _update_resources(event, options): |
| 148 | ''' |
| 149 | TODO: document this method |
| 150 | ''' |
| 151 | conn = _get_conn(options) |
| 152 | cur = conn.cursor() |
| 153 | |
| 154 | cur.execute('SELECT id FROM salt_resources') |
| 155 | resources_db = [res[0] for res in cur.fetchall()] |
| 156 | resources = event.get('return', {}).values() |
| 157 | |
| 158 | for resource in resources: |
| 159 | rid = '%s|%s' % (event.get('id'), resource.get('__id__')) |
| 160 | if rid in resources_db: |
| 161 | status = 'unknown' |
| 162 | if resource.get('result', None) is not None: |
| 163 | status = 'success' if resource.get('result') else 'failed' |
| 164 | |
| 165 | resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp) |
| 166 | WHERE id = %s''' |
| 167 | |
| 168 | cur.execute( |
| 169 | resource_sql, ( |
| 170 | status, |
| 171 | repr(resource), |
| 172 | rid |
| 173 | ) |
| 174 | ) |
| 175 | |
| 176 | conn.commit() |
| 177 | |
| 178 | _close_conn(conn) |
| 179 | |
| 180 | |
| 181 | def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs): |
| 182 | ''' |
| 183 | Listen to events and parse Salt state returns |
| 184 | ''' |
| 185 | if __opts__['__role'] == 'master': |
| 186 | event_bus = salt.utils.event.get_master_event( |
| 187 | __opts__, |
| 188 | __opts__['sock_dir'], |
| 189 | listen=True) |
| 190 | else: |
| 191 | event_bus = salt.utils.event.get_event( |
| 192 | 'minion', |
| 193 | transport=__opts__['transport'], |
| 194 | opts=__opts__, |
| 195 | sock_dir=__opts__['sock_dir'], |
| 196 | listen=True) |
| 197 | log.debug('Saltgraph engine started') |
| 198 | |
| 199 | while True: |
| 200 | event = event_bus.get_event() |
| 201 | supported_funcs = ['state.sls', 'state.apply', 'state.highstate'] |
| 202 | if event and event.get('fun', None) in supported_funcs: |
| 203 | test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])] |
| 204 | if not test: |
| 205 | options = { |
| 206 | 'host': host, |
| 207 | 'user': user, |
| 208 | 'passwd': password, |
| 209 | 'db': database, |
| 210 | 'port': port |
| 211 | } |
| 212 | is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')] |
| 213 | if is_reclass or not _up_to_date(options): |
| 214 | _get_lowstate_data(options) |
| 215 | |
| 216 | _update_resources(event, options) |
| 217 | |