blob: afb6112e1c544f65e4ddb45fedb3306717f99336 [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex3bc95f62020-03-05 17:00:04 -06003import json
4import os
5
6from tests.test_base import tests_dir
7
8
9# Prepare fake filenames and files
10_res_dir = os.path.join(tests_dir, 'res')
Alex9a4ad212020-10-01 18:04:25 -050011_fake_kube_config_path = os.path.join(_res_dir, "_fake_kube_config.env")
Alex3bc95f62020-03-05 17:00:04 -060012
13
14# preload file from res
15def _load_from_res(_filename, mode='rt'):
16 fake_file_path = os.path.join(_res_dir, _filename)
17 _patch_buf = []
18 with open(fake_file_path, mode) as _f:
19 _patch_buf = _f.read()
20
21 return _patch_buf
22
23
24_fakepage_template = _load_from_res(os.path.join(_res_dir, "_fakepage.html"))
25_fakepage_empty = _load_from_res(os.path.join(_res_dir, "_fakeempty.html"))
26_fake_keys = json.loads(_load_from_res("_fake_keys.json"))
27_fake_pkg_versions = _load_from_res("_fake_pkg_versions.json")
28_fake_network_data = _load_from_res("_fake_net_data.json")
29
30
31def _prepare_result_for_target(_tgt, result=True):
32 # prepare True answer for target node if we have it in fakes list
33 _nodes = _fake_keys["return"]["minions"]
34 _m = {}
35 if _tgt == "*":
36 for _n in _nodes:
37 _m[_n] = result
38 elif _tgt in _nodes:
39 # single target
40 _m[_tgt] = result
41 elif " or " in _tgt:
42 # compund
43 _t_list = _tgt.split(" or ")
44 for _t in _t_list:
45 _m[_t] = result
46 return _m
47
48
49class MockResponse:
50 def __init__(self, _buffer, status_code):
51 if _buffer is None:
52 self.content = _buffer
53 self.text = _buffer
54 self.json = _buffer
55 elif isinstance(_buffer, bytes):
56 self.content = _buffer
57 self.text = None
58 self._json = None
59 elif isinstance(_buffer, dict):
60 _dump = json.dumps(_buffer)
61 self.content = _dump.encode('utf-8')
62 self.text = _dump
63 self._json = _buffer
64 else:
65 self.content = _buffer.encode('utf-8')
66 self.text = _buffer
67 self._json = None
68
69 self.status_code = status_code
70 self.reason = "OK" if self.status_code == 200 else "FAIL"
71
72 def content(self):
73 return self.content
74
75 def text(self):
76 return self.text
77
78 def json(self):
79 if not self._json:
80 try:
81 _j = json.loads(self.text)
82 except Exception:
83 raise Exception("Failed to create json {}".format(self.text))
84 return _j
85 else:
86 return self._json
87
88 def reason(self):
89 return self.reason
90
91 def ok(self):
92 return True if self.status_code == 200 else False
93
94 def cookies(self):
95 return None
96
97
98def mocked_salt_post(*args, **kwargs):
99 _rest_handle = args[0].split('/', 3)[3]
100 if _rest_handle == "login":
101 # return fake token
102 _fake_token = {
103 "return":
104 [
105 {
106 "perms": [
107 ".*",
108 "@local",
109 "@wheel",
110 "@runner",
111 "@jobs"
112 ],
113 "start": 0,
114 "token": "faketoken",
115 "expire": 0,
116 "user": "salt",
117 "eauth": "pam"
118 }
119 ]
120 }
121
122 return MockResponse(_fake_token, 200)
123 elif not _rest_handle and "json" in kwargs:
124 # handle functions
125 _funs = kwargs["json"]
126 if isinstance(_funs, list):
127 if len(_funs) > 1:
128 raise Exception("Multiple commands in sale requiest")
129 else:
130 _f = _funs[0]
131 _t = _f["tgt"]
132 _a = _f["arg"] if "arg" in _f else ""
133 _f = _f["fun"]
134 if _f == "test.ping":
135 # prepare answer to ping
136 _val = _prepare_result_for_target(_t)
137 return MockResponse({"return": [_val]}, 200)
138 elif _f == "pillar.get":
139 # pillar get response, preload data
140 _j = json.loads(_load_from_res("_fake_pillars.json"))
141 _result = {"return": []}
142 if _t in _j.keys():
143 # target is single
144 _j = _j[_t]
145 _r = {_t: _j[_a]} if _a in _j else {}
146 else:
147 # target is a compound
148 _t_list = _t.split(" or ")
149 _r = {}
150 for _t in _t_list:
151 _val = _j[_t][_a] if _a in _j[_t] else {}
152 _r[_t] = _val
153 _result["return"].append(_r)
154 return MockResponse(_result, 200)
155 elif _f == "cmd.run":
156 # determine which script is called
157 _args = _a.split()
158 if _args[0] == "python" and _args[1].endswith("pkg_versions.py"):
159 _val = _prepare_result_for_target(_t, _fake_pkg_versions)
160 elif _args[0] == "python" and _args[1].endswith("ifs_data.py"):
161 _val = _prepare_result_for_target(_t, _fake_network_data)
162 elif _args[0] == "uname":
163 _val = _prepare_result_for_target(_t, "FakeLinux")
164 elif _args[0] == "lscpu":
165 _val = _prepare_result_for_target(
166 _t,
167 _load_from_res("_fake_lscpu.txt")
168 )
169 elif _args[0] == "free":
170 _val = _prepare_result_for_target(
171 _t,
172 "Mem: 1.9G 1.4G 84M 22M 524M 343M"
173 )
174 elif _args[0] == "df":
175 _val = _prepare_result_for_target(
176 _t,
177 _load_from_res("_fake_df.txt")
178 )
179 elif _args[0] == "service":
180 _val = _prepare_result_for_target(
181 _t,
182 _load_from_res("_fake_service_status.txt")
183 )
184 elif _args[0] == "virsh":
185 _val = _prepare_result_for_target(
186 _t,
187 _load_from_res("_fake_kvm_instances.txt")
188 )
189 elif _args[0] == "cat" and \
190 _args[1].endswith("/proc/net/softnet_stat;"):
191 _val = _prepare_result_for_target(
192 _t,
193 _load_from_res("_fake_softnet_stats.txt")
194 )
195 return MockResponse({"return": [_val]}, 200)
196 elif _f in ["file.mkdir", "file.touch", "file.write", "cp.get_file"]:
197 _val = _prepare_result_for_target(_t)
198 return MockResponse({"return": [_val]}, 200)
199
200 return MockResponse(None, 404)
201
202
203def mocked_salt_get(*args, **kwargs):
204 _rest_handle = args[0].split('/', 3)[3]
205 if _rest_handle == "keys":
206 # return list of minions
207 _fake_keys = _load_from_res("_fake_keys.json")
208 return MockResponse(_fake_keys, 200)
209 elif _rest_handle == "minions":
210 # list of minions
211 _list = _load_from_res("_fake_minions.json")
212 return MockResponse(_list, 200)
213 return MockResponse(None, 404)
214
215
216def mocked_package_get(*args, **kwargs):
217 # fake page _placeholder_
218 _placeholder = "_placeholder_"
219 _type = "_type_"
220 # fake domain
221 _url = "http://fakedomain.com"
222 # folders list and file
223 _folders = [
224 "2099.0.0",
225 "ubuntu",
226 "dists",
227 "trusty",
228 "main",
229 "binary-amd64"
230 ]
231 _file = "Packages.gz"
232
233 # if this is a fakedomain for mirrors
234 if args[0].startswith(_url):
235 # cut url
236 _u = args[0].replace(_url, "")
237 # detect folder
238 _split_res = _u.rsplit('/', 2)
239 if len(_split_res) > 2 and _u[-1] != '/':
240 _current_page = _u.rsplit('/', 2)[2]
241 else:
242 _current_page = _u.rsplit('/', 2)[1]
243 # if this is main index page, take first
244 if len(_current_page) == 0:
245 # initial folder
246 _p = _fakepage_template
247 _p = _p.replace(_placeholder, _folders[0] + "/")
248 _p = _p.replace(_type, "-")
249 # return fake page
250 return MockResponse(_p, 200)
251 # index in array
252 elif _current_page in _folders:
253 # simulate folder walk
254 _ind = _folders.index(_current_page)
255 # get next one
256 if _ind+1 < len(_folders):
257 # folder
258 _p = _fakepage_template
259 _p = _p.replace(_placeholder, _folders[_ind+1] + "/")
260 _p = _p.replace(_type, "-")
261 else:
262 # file
263 _p = _fakepage_template
264 _p = _p.replace(_placeholder, _file)
265 # type is detected as '-' for folder
266 # and <number> for file
267 _p = _p.replace(_type, "999")
268 # supply next fake page
269 return MockResponse(_p, 200)
270 elif _current_page == _file:
271 # just package.gz file
272 # preload file
273 _gzfile = _load_from_res("Packages.gz", mode='rb')
274 return MockResponse(_gzfile, 200)
275 elif _current_page == "hotfix" or _current_page == "update":
276 return MockResponse(_fakepage_empty, 200)
277
278 return MockResponse(None, 404)
279
280
281_shell_salt_path = "cfg_checker.common.salt_utils.shell"
282
283
284def mocked_shell(*args, **kwargs):
285 _args = args[0].split()
286 # _fake_salt_response = ["cfg01.fakedomain.com"]
287 _args = _args[1:] if _args[0] == "sudo" else _args
288 if _args[0].startswith("salt-call"):
289 # local calls
290 _json = {"local": None}
291 if _args[-1].startswith("_param:salt_api_password"):
292 _json["local"] = "fakepassword"
293 return json.dumps(_json)
294
295 return "emptyfakeresponse"