Alex | 0989ecf | 2022-03-29 13:43:21 -0500 | [diff] [blame] | 1 | # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| 2 | # Copyright 2019-2022 Mirantis, Inc. |
Alex | 3bc95f6 | 2020-03-05 17:00:04 -0600 | [diff] [blame] | 3 | import json |
| 4 | import os |
| 5 | |
| 6 | from tests.test_base import tests_dir |
| 7 | |
| 8 | |
| 9 | # Prepare fake filenames and files |
| 10 | _res_dir = os.path.join(tests_dir, 'res') |
Alex | 9a4ad21 | 2020-10-01 18:04:25 -0500 | [diff] [blame] | 11 | _fake_kube_config_path = os.path.join(_res_dir, "_fake_kube_config.env") |
Alex | 3bc95f6 | 2020-03-05 17:00:04 -0600 | [diff] [blame] | 12 | |
| 13 | |
| 14 | # preload file from res |
| 15 | def _load_from_res(_filename, mode='rt'): |
| 16 | fake_file_path = os.path.join(_res_dir, _filename) |
| 17 | _patch_buf = [] |
| 18 | with open(fake_file_path, mode) as _f: |
| 19 | _patch_buf = _f.read() |
| 20 | |
| 21 | return _patch_buf |
| 22 | |
| 23 | |
| 24 | _fakepage_template = _load_from_res(os.path.join(_res_dir, "_fakepage.html")) |
| 25 | _fakepage_empty = _load_from_res(os.path.join(_res_dir, "_fakeempty.html")) |
| 26 | _fake_keys = json.loads(_load_from_res("_fake_keys.json")) |
| 27 | _fake_pkg_versions = _load_from_res("_fake_pkg_versions.json") |
| 28 | _fake_network_data = _load_from_res("_fake_net_data.json") |
| 29 | |
| 30 | |
| 31 | def _prepare_result_for_target(_tgt, result=True): |
| 32 | # prepare True answer for target node if we have it in fakes list |
| 33 | _nodes = _fake_keys["return"]["minions"] |
| 34 | _m = {} |
| 35 | if _tgt == "*": |
| 36 | for _n in _nodes: |
| 37 | _m[_n] = result |
| 38 | elif _tgt in _nodes: |
| 39 | # single target |
| 40 | _m[_tgt] = result |
| 41 | elif " or " in _tgt: |
| 42 | # compund |
| 43 | _t_list = _tgt.split(" or ") |
| 44 | for _t in _t_list: |
| 45 | _m[_t] = result |
| 46 | return _m |
| 47 | |
| 48 | |
| 49 | class MockResponse: |
| 50 | def __init__(self, _buffer, status_code): |
| 51 | if _buffer is None: |
| 52 | self.content = _buffer |
| 53 | self.text = _buffer |
| 54 | self.json = _buffer |
| 55 | elif isinstance(_buffer, bytes): |
| 56 | self.content = _buffer |
| 57 | self.text = None |
| 58 | self._json = None |
| 59 | elif isinstance(_buffer, dict): |
| 60 | _dump = json.dumps(_buffer) |
| 61 | self.content = _dump.encode('utf-8') |
| 62 | self.text = _dump |
| 63 | self._json = _buffer |
| 64 | else: |
| 65 | self.content = _buffer.encode('utf-8') |
| 66 | self.text = _buffer |
| 67 | self._json = None |
| 68 | |
| 69 | self.status_code = status_code |
| 70 | self.reason = "OK" if self.status_code == 200 else "FAIL" |
| 71 | |
| 72 | def content(self): |
| 73 | return self.content |
| 74 | |
| 75 | def text(self): |
| 76 | return self.text |
| 77 | |
| 78 | def json(self): |
| 79 | if not self._json: |
| 80 | try: |
| 81 | _j = json.loads(self.text) |
| 82 | except Exception: |
| 83 | raise Exception("Failed to create json {}".format(self.text)) |
| 84 | return _j |
| 85 | else: |
| 86 | return self._json |
| 87 | |
| 88 | def reason(self): |
| 89 | return self.reason |
| 90 | |
| 91 | def ok(self): |
| 92 | return True if self.status_code == 200 else False |
| 93 | |
| 94 | def cookies(self): |
| 95 | return None |
| 96 | |
| 97 | |
| 98 | def mocked_salt_post(*args, **kwargs): |
| 99 | _rest_handle = args[0].split('/', 3)[3] |
| 100 | if _rest_handle == "login": |
| 101 | # return fake token |
| 102 | _fake_token = { |
| 103 | "return": |
| 104 | [ |
| 105 | { |
| 106 | "perms": [ |
| 107 | ".*", |
| 108 | "@local", |
| 109 | "@wheel", |
| 110 | "@runner", |
| 111 | "@jobs" |
| 112 | ], |
| 113 | "start": 0, |
| 114 | "token": "faketoken", |
| 115 | "expire": 0, |
| 116 | "user": "salt", |
| 117 | "eauth": "pam" |
| 118 | } |
| 119 | ] |
| 120 | } |
| 121 | |
| 122 | return MockResponse(_fake_token, 200) |
| 123 | elif not _rest_handle and "json" in kwargs: |
| 124 | # handle functions |
| 125 | _funs = kwargs["json"] |
| 126 | if isinstance(_funs, list): |
| 127 | if len(_funs) > 1: |
| 128 | raise Exception("Multiple commands in sale requiest") |
| 129 | else: |
| 130 | _f = _funs[0] |
| 131 | _t = _f["tgt"] |
| 132 | _a = _f["arg"] if "arg" in _f else "" |
| 133 | _f = _f["fun"] |
| 134 | if _f == "test.ping": |
| 135 | # prepare answer to ping |
| 136 | _val = _prepare_result_for_target(_t) |
| 137 | return MockResponse({"return": [_val]}, 200) |
| 138 | elif _f == "pillar.get": |
| 139 | # pillar get response, preload data |
| 140 | _j = json.loads(_load_from_res("_fake_pillars.json")) |
| 141 | _result = {"return": []} |
| 142 | if _t in _j.keys(): |
| 143 | # target is single |
| 144 | _j = _j[_t] |
| 145 | _r = {_t: _j[_a]} if _a in _j else {} |
| 146 | else: |
| 147 | # target is a compound |
| 148 | _t_list = _t.split(" or ") |
| 149 | _r = {} |
| 150 | for _t in _t_list: |
| 151 | _val = _j[_t][_a] if _a in _j[_t] else {} |
| 152 | _r[_t] = _val |
| 153 | _result["return"].append(_r) |
| 154 | return MockResponse(_result, 200) |
| 155 | elif _f == "cmd.run": |
| 156 | # determine which script is called |
| 157 | _args = _a.split() |
| 158 | if _args[0] == "python" and _args[1].endswith("pkg_versions.py"): |
| 159 | _val = _prepare_result_for_target(_t, _fake_pkg_versions) |
| 160 | elif _args[0] == "python" and _args[1].endswith("ifs_data.py"): |
| 161 | _val = _prepare_result_for_target(_t, _fake_network_data) |
| 162 | elif _args[0] == "uname": |
| 163 | _val = _prepare_result_for_target(_t, "FakeLinux") |
| 164 | elif _args[0] == "lscpu": |
| 165 | _val = _prepare_result_for_target( |
| 166 | _t, |
| 167 | _load_from_res("_fake_lscpu.txt") |
| 168 | ) |
| 169 | elif _args[0] == "free": |
| 170 | _val = _prepare_result_for_target( |
| 171 | _t, |
| 172 | "Mem: 1.9G 1.4G 84M 22M 524M 343M" |
| 173 | ) |
| 174 | elif _args[0] == "df": |
| 175 | _val = _prepare_result_for_target( |
| 176 | _t, |
| 177 | _load_from_res("_fake_df.txt") |
| 178 | ) |
| 179 | elif _args[0] == "service": |
| 180 | _val = _prepare_result_for_target( |
| 181 | _t, |
| 182 | _load_from_res("_fake_service_status.txt") |
| 183 | ) |
| 184 | elif _args[0] == "virsh": |
| 185 | _val = _prepare_result_for_target( |
| 186 | _t, |
| 187 | _load_from_res("_fake_kvm_instances.txt") |
| 188 | ) |
| 189 | elif _args[0] == "cat" and \ |
| 190 | _args[1].endswith("/proc/net/softnet_stat;"): |
| 191 | _val = _prepare_result_for_target( |
| 192 | _t, |
| 193 | _load_from_res("_fake_softnet_stats.txt") |
| 194 | ) |
| 195 | return MockResponse({"return": [_val]}, 200) |
| 196 | elif _f in ["file.mkdir", "file.touch", "file.write", "cp.get_file"]: |
| 197 | _val = _prepare_result_for_target(_t) |
| 198 | return MockResponse({"return": [_val]}, 200) |
| 199 | |
| 200 | return MockResponse(None, 404) |
| 201 | |
| 202 | |
| 203 | def mocked_salt_get(*args, **kwargs): |
| 204 | _rest_handle = args[0].split('/', 3)[3] |
| 205 | if _rest_handle == "keys": |
| 206 | # return list of minions |
| 207 | _fake_keys = _load_from_res("_fake_keys.json") |
| 208 | return MockResponse(_fake_keys, 200) |
| 209 | elif _rest_handle == "minions": |
| 210 | # list of minions |
| 211 | _list = _load_from_res("_fake_minions.json") |
| 212 | return MockResponse(_list, 200) |
| 213 | return MockResponse(None, 404) |
| 214 | |
| 215 | |
| 216 | def mocked_package_get(*args, **kwargs): |
| 217 | # fake page _placeholder_ |
| 218 | _placeholder = "_placeholder_" |
| 219 | _type = "_type_" |
| 220 | # fake domain |
| 221 | _url = "http://fakedomain.com" |
| 222 | # folders list and file |
| 223 | _folders = [ |
| 224 | "2099.0.0", |
| 225 | "ubuntu", |
| 226 | "dists", |
| 227 | "trusty", |
| 228 | "main", |
| 229 | "binary-amd64" |
| 230 | ] |
| 231 | _file = "Packages.gz" |
| 232 | |
| 233 | # if this is a fakedomain for mirrors |
| 234 | if args[0].startswith(_url): |
| 235 | # cut url |
| 236 | _u = args[0].replace(_url, "") |
| 237 | # detect folder |
| 238 | _split_res = _u.rsplit('/', 2) |
| 239 | if len(_split_res) > 2 and _u[-1] != '/': |
| 240 | _current_page = _u.rsplit('/', 2)[2] |
| 241 | else: |
| 242 | _current_page = _u.rsplit('/', 2)[1] |
| 243 | # if this is main index page, take first |
| 244 | if len(_current_page) == 0: |
| 245 | # initial folder |
| 246 | _p = _fakepage_template |
| 247 | _p = _p.replace(_placeholder, _folders[0] + "/") |
| 248 | _p = _p.replace(_type, "-") |
| 249 | # return fake page |
| 250 | return MockResponse(_p, 200) |
| 251 | # index in array |
| 252 | elif _current_page in _folders: |
| 253 | # simulate folder walk |
| 254 | _ind = _folders.index(_current_page) |
| 255 | # get next one |
| 256 | if _ind+1 < len(_folders): |
| 257 | # folder |
| 258 | _p = _fakepage_template |
| 259 | _p = _p.replace(_placeholder, _folders[_ind+1] + "/") |
| 260 | _p = _p.replace(_type, "-") |
| 261 | else: |
| 262 | # file |
| 263 | _p = _fakepage_template |
| 264 | _p = _p.replace(_placeholder, _file) |
| 265 | # type is detected as '-' for folder |
| 266 | # and <number> for file |
| 267 | _p = _p.replace(_type, "999") |
| 268 | # supply next fake page |
| 269 | return MockResponse(_p, 200) |
| 270 | elif _current_page == _file: |
| 271 | # just package.gz file |
| 272 | # preload file |
| 273 | _gzfile = _load_from_res("Packages.gz", mode='rb') |
| 274 | return MockResponse(_gzfile, 200) |
| 275 | elif _current_page == "hotfix" or _current_page == "update": |
| 276 | return MockResponse(_fakepage_empty, 200) |
| 277 | |
| 278 | return MockResponse(None, 404) |
| 279 | |
| 280 | |
| 281 | _shell_salt_path = "cfg_checker.common.salt_utils.shell" |
| 282 | |
| 283 | |
| 284 | def mocked_shell(*args, **kwargs): |
| 285 | _args = args[0].split() |
| 286 | # _fake_salt_response = ["cfg01.fakedomain.com"] |
| 287 | _args = _args[1:] if _args[0] == "sudo" else _args |
| 288 | if _args[0].startswith("salt-call"): |
| 289 | # local calls |
| 290 | _json = {"local": None} |
| 291 | if _args[-1].startswith("_param:salt_api_password"): |
| 292 | _json["local"] = "fakepassword" |
| 293 | return json.dumps(_json) |
| 294 | |
| 295 | return "emptyfakeresponse" |