Anna Arhipova | a843b46 | 2024-07-06 17:37:00 +0200 | [diff] [blame] | 1 | import logging |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 2 | from typing import Any, Callable, Dict, List |
| 3 | |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 4 | from django.core.cache import cache |
Anna Arhipova | a843b46 | 2024-07-06 17:37:00 +0200 | [diff] [blame] | 5 | from django.db.models import Model |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 6 | from parse import parse |
Anna Arhipova | 7cdcc85 | 2023-11-15 18:20:45 +0100 | [diff] [blame] | 7 | |
| 8 | |
| 9 | def parse_title(test_name): |
| 10 | # Sometimes id can be without the closing ] symbol |
| 11 | if "[" in test_name and "]" not in test_name: |
| 12 | test_name += "]" |
| 13 | token_count = test_name.split(".").__len__() |
| 14 | |
| 15 | if test_name.startswith("=="): |
| 16 | return test_name |
| 17 | |
| 18 | if test_name.startswith(".setUp") or test_name.startswith(".tearDown"): |
| 19 | fmt = "{test_title}(" + "{}." * (token_count - 2) + "{class_name})" |
| 20 | r = parse(fmt, test_name) |
| 21 | return f"{r['class_name']}.{r['test_title']}".strip() |
| 22 | try: |
| 23 | fmt = "{}." * (token_count - 2) + "{class_name}.{test_title}[{id}]" |
| 24 | r = parse(fmt, test_name) |
| 25 | return f"{r['test_title']}[{r['id']}]" |
| 26 | except TypeError: |
| 27 | # return file_name.test_class.test_name in other complicated cases |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 28 | return ".".join(test_name.split(".")[:3]) |
Anna Arhipova | 7cdcc85 | 2023-11-15 18:20:45 +0100 | [diff] [blame] | 29 | |
| 30 | |
| 31 | def short_names_for_dict(_dict): |
| 32 | __dict = {} |
| 33 | for _k in _dict.keys(): |
| 34 | __k = parse_title(_k) |
| 35 | # Replace only those keys which are absent in the dict or empty |
| 36 | # (defined as "No result found") |
| 37 | if __dict.get(__k) == "No result found" or not __dict.get(__k): |
| 38 | __dict[__k] = _dict[_k] |
| 39 | return __dict |
| 40 | |
| 41 | |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 42 | def get_dict_diff( |
| 43 | dict1: dict, dict2: dict, compare_by_key=None |
| 44 | ) -> Dict[str, List]: |
Anna Arhipova | 7cdcc85 | 2023-11-15 18:20:45 +0100 | [diff] [blame] | 45 | all_keys = sorted(set(list(dict1.keys()) + list(dict2.keys()))) |
| 46 | |
| 47 | result = dict() |
| 48 | for k in all_keys: |
| 49 | if compare_by_key: |
| 50 | if dict1.get(k, {}).get(compare_by_key) == dict2.get(k, {}).get( |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 51 | compare_by_key |
| 52 | ): |
Anna Arhipova | 7cdcc85 | 2023-11-15 18:20:45 +0100 | [diff] [blame] | 53 | continue |
| 54 | else: |
| 55 | if dict1.get(k) == dict2.get(k): |
| 56 | continue |
| 57 | result[k] = [dict1.get(k), dict2.get(k)] |
| 58 | return result |
| 59 | |
| 60 | |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 61 | def replace_all(text: str, olds: str, new: str) -> str: |
| 62 | r = text |
| 63 | for _s in olds: |
| 64 | r = r.replace(_s, new) |
| 65 | return r |
| 66 | |
| 67 | |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 68 | def cached( |
| 69 | timeout: int = None, |
| 70 | condition_for_endless_cache: Callable = lambda x: False, |
| 71 | ) -> Callable: |
Anna Arhipova | 14cdf8a | 2024-02-06 15:43:15 +0100 | [diff] [blame] | 72 | """ |
| 73 | :param timeout: (in seconds) usage accordingly |
| 74 | https://docs.djangoproject.com/en/4.2/topics/cache/#basic-usage |
| 75 | :param condition_for_endless_cache: Callable should return boolean. |
| 76 | Checks a result of function. If Result meets requirements of condition |
| 77 | then the endless timeout will be set. Or it will use provided timeout |
| 78 | otherwise |
| 79 | |
| 80 | :return: decorator |
| 81 | """ |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 82 | |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 83 | def decorator(func: Callable) -> Callable: |
| 84 | def wrapper(*args, **kwargs) -> Any: |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 85 | cache_key = f"{func.__name__}_{args}_{kwargs}" |
| 86 | cache_key = replace_all(cache_key, "{}()'\" .,:", "_") |
Anna Arhipova | 14cdf8a | 2024-02-06 15:43:15 +0100 | [diff] [blame] | 87 | cached_value = cache.get(cache_key) |
| 88 | if cached_value is None: |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 89 | print(f"{func.__name__} MISS") |
| 90 | result = func(*args, **kwargs) |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 91 | _timeout = ( |
| 92 | None if condition_for_endless_cache(result) else timeout |
| 93 | ) |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 94 | |
Anna Arhipova | 14cdf8a | 2024-02-06 15:43:15 +0100 | [diff] [blame] | 95 | cache.set(cache_key, result, timeout=_timeout) |
| 96 | return result |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 97 | print(f"{func.__name__} hit") |
| 98 | |
| 99 | # # FIXME Assert to test the caching mechanism |
| 100 | # _result = func(*args, **kwargs) |
| 101 | # for d in difflib.ndiff(str(result), str(_result)): |
| 102 | # if not d.startswith(" "): |
| 103 | # print(d) |
| 104 | # assert result == _result |
| 105 | # # ENDFIXME |
| 106 | |
Anna Arhipova | 14cdf8a | 2024-02-06 15:43:15 +0100 | [diff] [blame] | 107 | return cached_value |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 108 | |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 109 | return wrapper |
stavrovska | 28772bc | 2024-05-22 09:33:50 +0200 | [diff] [blame] | 110 | |
Anna Arhipova | 32dd8ce | 2024-01-20 15:30:47 +0100 | [diff] [blame] | 111 | return decorator |
| 112 | |
| 113 | |
Anna Arhipova | a843b46 | 2024-07-06 17:37:00 +0200 | [diff] [blame] | 114 | class DBHandler(logging.Handler): |
| 115 | |
| 116 | def __init__(self, storage: Model) -> None: |
| 117 | logging.Handler.__init__(self) |
| 118 | self.storage = storage |
| 119 | |
| 120 | def emit(self, record: logging.LogRecord) -> None: |
| 121 | msg = self.format(record) |
| 122 | color = "black" |
| 123 | if record.levelno == logging.ERROR: |
| 124 | color = "red" |
| 125 | if record.levelno == logging.DEBUG: |
| 126 | color = "grey" |
| 127 | self.storage.text += f"<a style='color:{color}'>{msg} </a>\n" |
| 128 | self.storage.save() |
| 129 | print(self.storage.text) |
| 130 | |
| 131 | |
| 132 | def DBlogger(name: str, storage: Model) -> logging.Logger: |
| 133 | _log = logging.getLogger(name) |
| 134 | _log.setLevel(logging.DEBUG) |
| 135 | formatter = logging.Formatter( |
| 136 | "[%(asctime)s] %(message)s", "%d %b %H:%M:%S" |
| 137 | ) |
| 138 | db_h = DBHandler(storage) |
| 139 | db_h.setFormatter(formatter) |
| 140 | _log.addHandler(db_h) |
| 141 | return _log |
| 142 | |
| 143 | |
Anna Arhipova | 7cdcc85 | 2023-11-15 18:20:45 +0100 | [diff] [blame] | 144 | if __name__ == "__main__": |
| 145 | pass |