blob: 863def59929eb5d03279c942f57fe6296d48a657 [file] [log] [blame]
Alex3bc95f62020-03-05 17:00:04 -06001import json
2import os
3
4from tests.test_base import tests_dir
5
6
7# Prepare fake filenames and files
8_res_dir = os.path.join(tests_dir, 'res')
9
10
11# preload file from res
12def _load_from_res(_filename, mode='rt'):
13 fake_file_path = os.path.join(_res_dir, _filename)
14 _patch_buf = []
15 with open(fake_file_path, mode) as _f:
16 _patch_buf = _f.read()
17
18 return _patch_buf
19
20
21_fakepage_template = _load_from_res(os.path.join(_res_dir, "_fakepage.html"))
22_fakepage_empty = _load_from_res(os.path.join(_res_dir, "_fakeempty.html"))
23_fake_keys = json.loads(_load_from_res("_fake_keys.json"))
24_fake_pkg_versions = _load_from_res("_fake_pkg_versions.json")
25_fake_network_data = _load_from_res("_fake_net_data.json")
26
27
28def _prepare_result_for_target(_tgt, result=True):
29 # prepare True answer for target node if we have it in fakes list
30 _nodes = _fake_keys["return"]["minions"]
31 _m = {}
32 if _tgt == "*":
33 for _n in _nodes:
34 _m[_n] = result
35 elif _tgt in _nodes:
36 # single target
37 _m[_tgt] = result
38 elif " or " in _tgt:
39 # compund
40 _t_list = _tgt.split(" or ")
41 for _t in _t_list:
42 _m[_t] = result
43 return _m
44
45
46class MockResponse:
47 def __init__(self, _buffer, status_code):
48 if _buffer is None:
49 self.content = _buffer
50 self.text = _buffer
51 self.json = _buffer
52 elif isinstance(_buffer, bytes):
53 self.content = _buffer
54 self.text = None
55 self._json = None
56 elif isinstance(_buffer, dict):
57 _dump = json.dumps(_buffer)
58 self.content = _dump.encode('utf-8')
59 self.text = _dump
60 self._json = _buffer
61 else:
62 self.content = _buffer.encode('utf-8')
63 self.text = _buffer
64 self._json = None
65
66 self.status_code = status_code
67 self.reason = "OK" if self.status_code == 200 else "FAIL"
68
69 def content(self):
70 return self.content
71
72 def text(self):
73 return self.text
74
75 def json(self):
76 if not self._json:
77 try:
78 _j = json.loads(self.text)
79 except Exception:
80 raise Exception("Failed to create json {}".format(self.text))
81 return _j
82 else:
83 return self._json
84
85 def reason(self):
86 return self.reason
87
88 def ok(self):
89 return True if self.status_code == 200 else False
90
91 def cookies(self):
92 return None
93
94
95def mocked_salt_post(*args, **kwargs):
96 _rest_handle = args[0].split('/', 3)[3]
97 if _rest_handle == "login":
98 # return fake token
99 _fake_token = {
100 "return":
101 [
102 {
103 "perms": [
104 ".*",
105 "@local",
106 "@wheel",
107 "@runner",
108 "@jobs"
109 ],
110 "start": 0,
111 "token": "faketoken",
112 "expire": 0,
113 "user": "salt",
114 "eauth": "pam"
115 }
116 ]
117 }
118
119 return MockResponse(_fake_token, 200)
120 elif not _rest_handle and "json" in kwargs:
121 # handle functions
122 _funs = kwargs["json"]
123 if isinstance(_funs, list):
124 if len(_funs) > 1:
125 raise Exception("Multiple commands in sale requiest")
126 else:
127 _f = _funs[0]
128 _t = _f["tgt"]
129 _a = _f["arg"] if "arg" in _f else ""
130 _f = _f["fun"]
131 if _f == "test.ping":
132 # prepare answer to ping
133 _val = _prepare_result_for_target(_t)
134 return MockResponse({"return": [_val]}, 200)
135 elif _f == "pillar.get":
136 # pillar get response, preload data
137 _j = json.loads(_load_from_res("_fake_pillars.json"))
138 _result = {"return": []}
139 if _t in _j.keys():
140 # target is single
141 _j = _j[_t]
142 _r = {_t: _j[_a]} if _a in _j else {}
143 else:
144 # target is a compound
145 _t_list = _t.split(" or ")
146 _r = {}
147 for _t in _t_list:
148 _val = _j[_t][_a] if _a in _j[_t] else {}
149 _r[_t] = _val
150 _result["return"].append(_r)
151 return MockResponse(_result, 200)
152 elif _f == "cmd.run":
153 # determine which script is called
154 _args = _a.split()
155 if _args[0] == "python" and _args[1].endswith("pkg_versions.py"):
156 _val = _prepare_result_for_target(_t, _fake_pkg_versions)
157 elif _args[0] == "python" and _args[1].endswith("ifs_data.py"):
158 _val = _prepare_result_for_target(_t, _fake_network_data)
159 elif _args[0] == "uname":
160 _val = _prepare_result_for_target(_t, "FakeLinux")
161 elif _args[0] == "lscpu":
162 _val = _prepare_result_for_target(
163 _t,
164 _load_from_res("_fake_lscpu.txt")
165 )
166 elif _args[0] == "free":
167 _val = _prepare_result_for_target(
168 _t,
169 "Mem: 1.9G 1.4G 84M 22M 524M 343M"
170 )
171 elif _args[0] == "df":
172 _val = _prepare_result_for_target(
173 _t,
174 _load_from_res("_fake_df.txt")
175 )
176 elif _args[0] == "service":
177 _val = _prepare_result_for_target(
178 _t,
179 _load_from_res("_fake_service_status.txt")
180 )
181 elif _args[0] == "virsh":
182 _val = _prepare_result_for_target(
183 _t,
184 _load_from_res("_fake_kvm_instances.txt")
185 )
186 elif _args[0] == "cat" and \
187 _args[1].endswith("/proc/net/softnet_stat;"):
188 _val = _prepare_result_for_target(
189 _t,
190 _load_from_res("_fake_softnet_stats.txt")
191 )
192 return MockResponse({"return": [_val]}, 200)
193 elif _f in ["file.mkdir", "file.touch", "file.write", "cp.get_file"]:
194 _val = _prepare_result_for_target(_t)
195 return MockResponse({"return": [_val]}, 200)
196
197 return MockResponse(None, 404)
198
199
200def mocked_salt_get(*args, **kwargs):
201 _rest_handle = args[0].split('/', 3)[3]
202 if _rest_handle == "keys":
203 # return list of minions
204 _fake_keys = _load_from_res("_fake_keys.json")
205 return MockResponse(_fake_keys, 200)
206 elif _rest_handle == "minions":
207 # list of minions
208 _list = _load_from_res("_fake_minions.json")
209 return MockResponse(_list, 200)
210 return MockResponse(None, 404)
211
212
213def mocked_package_get(*args, **kwargs):
214 # fake page _placeholder_
215 _placeholder = "_placeholder_"
216 _type = "_type_"
217 # fake domain
218 _url = "http://fakedomain.com"
219 # folders list and file
220 _folders = [
221 "2099.0.0",
222 "ubuntu",
223 "dists",
224 "trusty",
225 "main",
226 "binary-amd64"
227 ]
228 _file = "Packages.gz"
229
230 # if this is a fakedomain for mirrors
231 if args[0].startswith(_url):
232 # cut url
233 _u = args[0].replace(_url, "")
234 # detect folder
235 _split_res = _u.rsplit('/', 2)
236 if len(_split_res) > 2 and _u[-1] != '/':
237 _current_page = _u.rsplit('/', 2)[2]
238 else:
239 _current_page = _u.rsplit('/', 2)[1]
240 # if this is main index page, take first
241 if len(_current_page) == 0:
242 # initial folder
243 _p = _fakepage_template
244 _p = _p.replace(_placeholder, _folders[0] + "/")
245 _p = _p.replace(_type, "-")
246 # return fake page
247 return MockResponse(_p, 200)
248 # index in array
249 elif _current_page in _folders:
250 # simulate folder walk
251 _ind = _folders.index(_current_page)
252 # get next one
253 if _ind+1 < len(_folders):
254 # folder
255 _p = _fakepage_template
256 _p = _p.replace(_placeholder, _folders[_ind+1] + "/")
257 _p = _p.replace(_type, "-")
258 else:
259 # file
260 _p = _fakepage_template
261 _p = _p.replace(_placeholder, _file)
262 # type is detected as '-' for folder
263 # and <number> for file
264 _p = _p.replace(_type, "999")
265 # supply next fake page
266 return MockResponse(_p, 200)
267 elif _current_page == _file:
268 # just package.gz file
269 # preload file
270 _gzfile = _load_from_res("Packages.gz", mode='rb')
271 return MockResponse(_gzfile, 200)
272 elif _current_page == "hotfix" or _current_page == "update":
273 return MockResponse(_fakepage_empty, 200)
274
275 return MockResponse(None, 404)
276
277
278_shell_salt_path = "cfg_checker.common.salt_utils.shell"
279
280
281def mocked_shell(*args, **kwargs):
282 _args = args[0].split()
283 # _fake_salt_response = ["cfg01.fakedomain.com"]
284 _args = _args[1:] if _args[0] == "sudo" else _args
285 if _args[0].startswith("salt-call"):
286 # local calls
287 _json = {"local": None}
288 if _args[-1].startswith("_param:salt_api_password"):
289 _json["local"] = "fakepassword"
290 return json.dumps(_json)
291
292 return "emptyfakeresponse"