blob: 02874493f79d2c948f00647e3311ace333a91496 [file] [log] [blame]
Adam Tengler3e96b112017-09-05 15:51:06 +00001# -*- coding: utf-8 -*-
2'''
3Saltgraph engine for catching returns of state runs, parsing them
4and passing them to flat database of latest Salt resource runs.
5'''
6
7# Import python libs
8from __future__ import absolute_import
9import datetime
10import json
11import logging
12
13# Import salt libs
14import salt.utils.event
15
16# Import third party libs
17try:
18 import psycopg2
19 import psycopg2.extras
20 HAS_POSTGRES = True
21except ImportError:
22 HAS_POSTGRES = False
23
24__virtualname__ = 'saltgraph'
25log = logging.getLogger(__name__)
26
27
28def __virtual__():
29 if not HAS_POSTGRES:
30 return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.'
31 return __virtualname__
32
33
34def _get_conn(options={}):
35 '''
36 Return a postgres connection.
37 '''
38 host = options.get('host', '127.0.0.1')
39 user = options.get('user', 'salt')
40 passwd = options.get('passwd', 'salt')
41 datab = options.get('db', 'salt')
42 port = options.get('port', 5432)
43
44 return psycopg2.connect(
45 host=host,
46 user=user,
47 password=passwd,
48 database=datab,
49 port=port)
50
51
52def _close_conn(conn):
53 '''
54 Close the Postgres connection
55 '''
56 conn.commit()
57 conn.close()
58
59
60def _get_lowstate_data(options={}):
61 '''
62 TODO: document this method
63 '''
64 conn = _get_conn(options)
65 cur = conn.cursor()
66
67 try:
68 # you can only do this on Salt Masters minion
69 lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True})
70 except:
71 lowstate_req = {}
72
73 for minion, lowstate_ret in lowstate_req.items():
74 if lowstate_ret.get('retcode') != 0:
75 continue
76 for resource in lowstate_ret.get('ret', []):
77 low_sql = '''INSERT INTO salt_resources
78 (id, resource_id, host, service, module, fun, status)
79 VALUES (%s, %s, %s, %s, %s, %s, %s)
80 ON CONFLICT (id) DO UPDATE
81 SET resource_id = excluded.resource_id,
82 host = excluded.host,
83 service = excluded.service,
84 module = excluded.module,
85 fun = excluded.fun,
86 alter_time = current_timestamp'''
87
88 rid = "%s|%s" % (minion, resource.get('__id__'))
89 cur.execute(
90 low_sql, (
91 rid,
92 resource.get('__id__'),
93 minion,
94 resource.get('__sls__'),
95 resource.get('state') if 'state' in resource else resource.get('module'),
96 resource.get('fun'),
97 'unknown'
98 )
99 )
100 conn.commit()
101
102 if lowstate_req:
103 meta_sql = '''INSERT INTO salt_resources_meta
104 (id, options)
105 VALUES (%s, %s)
106 ON CONFLICT (id) DO UPDATE
107 SET options = excluded.options,
108 alter_time = current_timestamp'''
109
110 cur.execute(
111 meta_sql, (
112 'lowstate_data',
113 '{}'
114 )
115 )
116 _close_conn(conn)
117
118
119def _up_to_date(options={}):
120 '''
121 TODO: document this method
122 '''
123 conn = _get_conn(options)
124 cur = conn.cursor()
125 #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
126
127 ret = False
128
129 # if lowstate data are older than 1 day, refresh them
130 cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',))
131 alter_time = cur.fetchone()
132
133 if alter_time:
134 now = datetime.datetime.utcnow()
135 day = datetime.timedelta(days=1)
136 time_diff = now - alter_time[0].replace(tzinfo=None)
137 if time_diff < day:
138 ret = True
139 else:
140 skip = False
141
142 _close_conn(conn)
143
144 return ret
145
146
147def _update_resources(event, options):
148 '''
149 TODO: document this method
150 '''
151 conn = _get_conn(options)
152 cur = conn.cursor()
153
154 cur.execute('SELECT id FROM salt_resources')
155 resources_db = [res[0] for res in cur.fetchall()]
156 resources = event.get('return', {}).values()
157
158 for resource in resources:
159 rid = '%s|%s' % (event.get('id'), resource.get('__id__'))
160 if rid in resources_db:
161 status = 'unknown'
162 if resource.get('result', None) is not None:
163 status = 'success' if resource.get('result') else 'failed'
164
165 resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp)
166 WHERE id = %s'''
167
168 cur.execute(
169 resource_sql, (
170 status,
171 repr(resource),
172 rid
173 )
174 )
175
176 conn.commit()
177
178 _close_conn(conn)
179
180
181def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs):
182 '''
183 Listen to events and parse Salt state returns
184 '''
185 if __opts__['__role'] == 'master':
186 event_bus = salt.utils.event.get_master_event(
187 __opts__,
188 __opts__['sock_dir'],
189 listen=True)
190 else:
191 event_bus = salt.utils.event.get_event(
192 'minion',
193 transport=__opts__['transport'],
194 opts=__opts__,
195 sock_dir=__opts__['sock_dir'],
196 listen=True)
197 log.debug('Saltgraph engine started')
198
199 while True:
200 event = event_bus.get_event()
201 supported_funcs = ['state.sls', 'state.apply', 'state.highstate']
202 if event and event.get('fun', None) in supported_funcs:
203 test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])]
204 if not test:
205 options = {
206 'host': host,
207 'user': user,
208 'passwd': password,
209 'db': database,
210 'port': port
211 }
212 is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')]
213 if is_reclass or not _up_to_date(options):
214 _get_lowstate_data(options)
215
216 _update_resources(event, options)
217