blob: 9d9c534819521cc90c1585b5bdb7f7322bd1c526 [file] [log] [blame]
Alex3bc95f62020-03-05 17:00:04 -06001import json
2import os
3
4from tests.test_base import tests_dir
5
6
7# Prepare fake filenames and files
8_res_dir = os.path.join(tests_dir, 'res')
Alex9a4ad212020-10-01 18:04:25 -05009_fake_kube_config_path = os.path.join(_res_dir, "_fake_kube_config.env")
Alex3bc95f62020-03-05 17:00:04 -060010
11
12# preload file from res
13def _load_from_res(_filename, mode='rt'):
14 fake_file_path = os.path.join(_res_dir, _filename)
15 _patch_buf = []
16 with open(fake_file_path, mode) as _f:
17 _patch_buf = _f.read()
18
19 return _patch_buf
20
21
22_fakepage_template = _load_from_res(os.path.join(_res_dir, "_fakepage.html"))
23_fakepage_empty = _load_from_res(os.path.join(_res_dir, "_fakeempty.html"))
24_fake_keys = json.loads(_load_from_res("_fake_keys.json"))
25_fake_pkg_versions = _load_from_res("_fake_pkg_versions.json")
26_fake_network_data = _load_from_res("_fake_net_data.json")
27
28
29def _prepare_result_for_target(_tgt, result=True):
30 # prepare True answer for target node if we have it in fakes list
31 _nodes = _fake_keys["return"]["minions"]
32 _m = {}
33 if _tgt == "*":
34 for _n in _nodes:
35 _m[_n] = result
36 elif _tgt in _nodes:
37 # single target
38 _m[_tgt] = result
39 elif " or " in _tgt:
40 # compund
41 _t_list = _tgt.split(" or ")
42 for _t in _t_list:
43 _m[_t] = result
44 return _m
45
46
47class MockResponse:
48 def __init__(self, _buffer, status_code):
49 if _buffer is None:
50 self.content = _buffer
51 self.text = _buffer
52 self.json = _buffer
53 elif isinstance(_buffer, bytes):
54 self.content = _buffer
55 self.text = None
56 self._json = None
57 elif isinstance(_buffer, dict):
58 _dump = json.dumps(_buffer)
59 self.content = _dump.encode('utf-8')
60 self.text = _dump
61 self._json = _buffer
62 else:
63 self.content = _buffer.encode('utf-8')
64 self.text = _buffer
65 self._json = None
66
67 self.status_code = status_code
68 self.reason = "OK" if self.status_code == 200 else "FAIL"
69
70 def content(self):
71 return self.content
72
73 def text(self):
74 return self.text
75
76 def json(self):
77 if not self._json:
78 try:
79 _j = json.loads(self.text)
80 except Exception:
81 raise Exception("Failed to create json {}".format(self.text))
82 return _j
83 else:
84 return self._json
85
86 def reason(self):
87 return self.reason
88
89 def ok(self):
90 return True if self.status_code == 200 else False
91
92 def cookies(self):
93 return None
94
95
96def mocked_salt_post(*args, **kwargs):
97 _rest_handle = args[0].split('/', 3)[3]
98 if _rest_handle == "login":
99 # return fake token
100 _fake_token = {
101 "return":
102 [
103 {
104 "perms": [
105 ".*",
106 "@local",
107 "@wheel",
108 "@runner",
109 "@jobs"
110 ],
111 "start": 0,
112 "token": "faketoken",
113 "expire": 0,
114 "user": "salt",
115 "eauth": "pam"
116 }
117 ]
118 }
119
120 return MockResponse(_fake_token, 200)
121 elif not _rest_handle and "json" in kwargs:
122 # handle functions
123 _funs = kwargs["json"]
124 if isinstance(_funs, list):
125 if len(_funs) > 1:
126 raise Exception("Multiple commands in sale requiest")
127 else:
128 _f = _funs[0]
129 _t = _f["tgt"]
130 _a = _f["arg"] if "arg" in _f else ""
131 _f = _f["fun"]
132 if _f == "test.ping":
133 # prepare answer to ping
134 _val = _prepare_result_for_target(_t)
135 return MockResponse({"return": [_val]}, 200)
136 elif _f == "pillar.get":
137 # pillar get response, preload data
138 _j = json.loads(_load_from_res("_fake_pillars.json"))
139 _result = {"return": []}
140 if _t in _j.keys():
141 # target is single
142 _j = _j[_t]
143 _r = {_t: _j[_a]} if _a in _j else {}
144 else:
145 # target is a compound
146 _t_list = _t.split(" or ")
147 _r = {}
148 for _t in _t_list:
149 _val = _j[_t][_a] if _a in _j[_t] else {}
150 _r[_t] = _val
151 _result["return"].append(_r)
152 return MockResponse(_result, 200)
153 elif _f == "cmd.run":
154 # determine which script is called
155 _args = _a.split()
156 if _args[0] == "python" and _args[1].endswith("pkg_versions.py"):
157 _val = _prepare_result_for_target(_t, _fake_pkg_versions)
158 elif _args[0] == "python" and _args[1].endswith("ifs_data.py"):
159 _val = _prepare_result_for_target(_t, _fake_network_data)
160 elif _args[0] == "uname":
161 _val = _prepare_result_for_target(_t, "FakeLinux")
162 elif _args[0] == "lscpu":
163 _val = _prepare_result_for_target(
164 _t,
165 _load_from_res("_fake_lscpu.txt")
166 )
167 elif _args[0] == "free":
168 _val = _prepare_result_for_target(
169 _t,
170 "Mem: 1.9G 1.4G 84M 22M 524M 343M"
171 )
172 elif _args[0] == "df":
173 _val = _prepare_result_for_target(
174 _t,
175 _load_from_res("_fake_df.txt")
176 )
177 elif _args[0] == "service":
178 _val = _prepare_result_for_target(
179 _t,
180 _load_from_res("_fake_service_status.txt")
181 )
182 elif _args[0] == "virsh":
183 _val = _prepare_result_for_target(
184 _t,
185 _load_from_res("_fake_kvm_instances.txt")
186 )
187 elif _args[0] == "cat" and \
188 _args[1].endswith("/proc/net/softnet_stat;"):
189 _val = _prepare_result_for_target(
190 _t,
191 _load_from_res("_fake_softnet_stats.txt")
192 )
193 return MockResponse({"return": [_val]}, 200)
194 elif _f in ["file.mkdir", "file.touch", "file.write", "cp.get_file"]:
195 _val = _prepare_result_for_target(_t)
196 return MockResponse({"return": [_val]}, 200)
197
198 return MockResponse(None, 404)
199
200
201def mocked_salt_get(*args, **kwargs):
202 _rest_handle = args[0].split('/', 3)[3]
203 if _rest_handle == "keys":
204 # return list of minions
205 _fake_keys = _load_from_res("_fake_keys.json")
206 return MockResponse(_fake_keys, 200)
207 elif _rest_handle == "minions":
208 # list of minions
209 _list = _load_from_res("_fake_minions.json")
210 return MockResponse(_list, 200)
211 return MockResponse(None, 404)
212
213
214def mocked_package_get(*args, **kwargs):
215 # fake page _placeholder_
216 _placeholder = "_placeholder_"
217 _type = "_type_"
218 # fake domain
219 _url = "http://fakedomain.com"
220 # folders list and file
221 _folders = [
222 "2099.0.0",
223 "ubuntu",
224 "dists",
225 "trusty",
226 "main",
227 "binary-amd64"
228 ]
229 _file = "Packages.gz"
230
231 # if this is a fakedomain for mirrors
232 if args[0].startswith(_url):
233 # cut url
234 _u = args[0].replace(_url, "")
235 # detect folder
236 _split_res = _u.rsplit('/', 2)
237 if len(_split_res) > 2 and _u[-1] != '/':
238 _current_page = _u.rsplit('/', 2)[2]
239 else:
240 _current_page = _u.rsplit('/', 2)[1]
241 # if this is main index page, take first
242 if len(_current_page) == 0:
243 # initial folder
244 _p = _fakepage_template
245 _p = _p.replace(_placeholder, _folders[0] + "/")
246 _p = _p.replace(_type, "-")
247 # return fake page
248 return MockResponse(_p, 200)
249 # index in array
250 elif _current_page in _folders:
251 # simulate folder walk
252 _ind = _folders.index(_current_page)
253 # get next one
254 if _ind+1 < len(_folders):
255 # folder
256 _p = _fakepage_template
257 _p = _p.replace(_placeholder, _folders[_ind+1] + "/")
258 _p = _p.replace(_type, "-")
259 else:
260 # file
261 _p = _fakepage_template
262 _p = _p.replace(_placeholder, _file)
263 # type is detected as '-' for folder
264 # and <number> for file
265 _p = _p.replace(_type, "999")
266 # supply next fake page
267 return MockResponse(_p, 200)
268 elif _current_page == _file:
269 # just package.gz file
270 # preload file
271 _gzfile = _load_from_res("Packages.gz", mode='rb')
272 return MockResponse(_gzfile, 200)
273 elif _current_page == "hotfix" or _current_page == "update":
274 return MockResponse(_fakepage_empty, 200)
275
276 return MockResponse(None, 404)
277
278
279_shell_salt_path = "cfg_checker.common.salt_utils.shell"
280
281
282def mocked_shell(*args, **kwargs):
283 _args = args[0].split()
284 # _fake_salt_response = ["cfg01.fakedomain.com"]
285 _args = _args[1:] if _args[0] == "sudo" else _args
286 if _args[0].startswith("salt-call"):
287 # local calls
288 _json = {"local": None}
289 if _args[-1].startswith("_param:salt_api_password"):
290 _json["local"] = "fakepassword"
291 return json.dumps(_json)
292
293 return "emptyfakeresponse"