blob: 73ea55fd166468dcb3e74155771028f4efbdc0a8 [file] [log] [blame]
Anna Arhipovaa843b462024-07-06 17:37:00 +02001import logging
stavrovska28772bc2024-05-22 09:33:50 +02002from typing import Any, Callable, Dict, List
3
Anna Arhipova32dd8ce2024-01-20 15:30:47 +01004from django.core.cache import cache
Anna Arhipovaa843b462024-07-06 17:37:00 +02005from django.db.models import Model
stavrovska28772bc2024-05-22 09:33:50 +02006from parse import parse
Anna Arhipova7cdcc852023-11-15 18:20:45 +01007
8
9def parse_title(test_name):
10 # Sometimes id can be without the closing ] symbol
11 if "[" in test_name and "]" not in test_name:
12 test_name += "]"
13 token_count = test_name.split(".").__len__()
14
15 if test_name.startswith("=="):
16 return test_name
17
18 if test_name.startswith(".setUp") or test_name.startswith(".tearDown"):
19 fmt = "{test_title}(" + "{}." * (token_count - 2) + "{class_name})"
20 r = parse(fmt, test_name)
21 return f"{r['class_name']}.{r['test_title']}".strip()
22 try:
23 fmt = "{}." * (token_count - 2) + "{class_name}.{test_title}[{id}]"
24 r = parse(fmt, test_name)
25 return f"{r['test_title']}[{r['id']}]"
26 except TypeError:
27 # return file_name.test_class.test_name in other complicated cases
stavrovska28772bc2024-05-22 09:33:50 +020028 return ".".join(test_name.split(".")[:3])
Anna Arhipova7cdcc852023-11-15 18:20:45 +010029
30
31def short_names_for_dict(_dict):
32 __dict = {}
33 for _k in _dict.keys():
34 __k = parse_title(_k)
35 # Replace only those keys which are absent in the dict or empty
36 # (defined as "No result found")
37 if __dict.get(__k) == "No result found" or not __dict.get(__k):
38 __dict[__k] = _dict[_k]
39 return __dict
40
41
stavrovska28772bc2024-05-22 09:33:50 +020042def get_dict_diff(
43 dict1: dict, dict2: dict, compare_by_key=None
44) -> Dict[str, List]:
Anna Arhipova7cdcc852023-11-15 18:20:45 +010045 all_keys = sorted(set(list(dict1.keys()) + list(dict2.keys())))
46
47 result = dict()
48 for k in all_keys:
49 if compare_by_key:
50 if dict1.get(k, {}).get(compare_by_key) == dict2.get(k, {}).get(
stavrovska28772bc2024-05-22 09:33:50 +020051 compare_by_key
52 ):
Anna Arhipova7cdcc852023-11-15 18:20:45 +010053 continue
54 else:
55 if dict1.get(k) == dict2.get(k):
56 continue
57 result[k] = [dict1.get(k), dict2.get(k)]
58 return result
59
60
Anna Arhipova32dd8ce2024-01-20 15:30:47 +010061def replace_all(text: str, olds: str, new: str) -> str:
62 r = text
63 for _s in olds:
64 r = r.replace(_s, new)
65 return r
66
67
stavrovska28772bc2024-05-22 09:33:50 +020068def cached(
69 timeout: int = None,
70 condition_for_endless_cache: Callable = lambda x: False,
71) -> Callable:
Anna Arhipova14cdf8a2024-02-06 15:43:15 +010072 """
73 :param timeout: (in seconds) usage accordingly
74 https://docs.djangoproject.com/en/4.2/topics/cache/#basic-usage
75 :param condition_for_endless_cache: Callable should return boolean.
76 Checks a result of function. If Result meets requirements of condition
77 then the endless timeout will be set. Or it will use provided timeout
78 otherwise
79
80 :return: decorator
81 """
stavrovska28772bc2024-05-22 09:33:50 +020082
Anna Arhipova32dd8ce2024-01-20 15:30:47 +010083 def decorator(func: Callable) -> Callable:
84 def wrapper(*args, **kwargs) -> Any:
stavrovska28772bc2024-05-22 09:33:50 +020085 cache_key = f"{func.__name__}_{args}_{kwargs}"
86 cache_key = replace_all(cache_key, "{}()'\" .,:", "_")
Anna Arhipova14cdf8a2024-02-06 15:43:15 +010087 cached_value = cache.get(cache_key)
88 if cached_value is None:
Anna Arhipova32dd8ce2024-01-20 15:30:47 +010089 print(f"{func.__name__} MISS")
90 result = func(*args, **kwargs)
stavrovska28772bc2024-05-22 09:33:50 +020091 _timeout = (
92 None if condition_for_endless_cache(result) else timeout
93 )
Anna Arhipova32dd8ce2024-01-20 15:30:47 +010094
Anna Arhipova14cdf8a2024-02-06 15:43:15 +010095 cache.set(cache_key, result, timeout=_timeout)
96 return result
Anna Arhipova32dd8ce2024-01-20 15:30:47 +010097 print(f"{func.__name__} hit")
98
99 # # FIXME Assert to test the caching mechanism
100 # _result = func(*args, **kwargs)
101 # for d in difflib.ndiff(str(result), str(_result)):
102 # if not d.startswith(" "):
103 # print(d)
104 # assert result == _result
105 # # ENDFIXME
106
Anna Arhipova14cdf8a2024-02-06 15:43:15 +0100107 return cached_value
stavrovska28772bc2024-05-22 09:33:50 +0200108
Anna Arhipova32dd8ce2024-01-20 15:30:47 +0100109 return wrapper
stavrovska28772bc2024-05-22 09:33:50 +0200110
Anna Arhipova32dd8ce2024-01-20 15:30:47 +0100111 return decorator
112
113
Anna Arhipovaa843b462024-07-06 17:37:00 +0200114class DBHandler(logging.Handler):
115
116 def __init__(self, storage: Model) -> None:
117 logging.Handler.__init__(self)
118 self.storage = storage
119
120 def emit(self, record: logging.LogRecord) -> None:
121 msg = self.format(record)
122 color = "black"
123 if record.levelno == logging.ERROR:
124 color = "red"
125 if record.levelno == logging.DEBUG:
126 color = "grey"
127 self.storage.text += f"<a style='color:{color}'>{msg} </a>\n"
128 self.storage.save()
129 print(self.storage.text)
130
131
132def DBlogger(name: str, storage: Model) -> logging.Logger:
133 _log = logging.getLogger(name)
134 _log.setLevel(logging.DEBUG)
135 formatter = logging.Formatter(
136 "[%(asctime)s] %(message)s", "%d %b %H:%M:%S"
137 )
138 db_h = DBHandler(storage)
139 db_h.setFormatter(formatter)
140 _log.addHandler(db_h)
141 return _log
142
143
Anna Arhipova7cdcc852023-11-15 18:20:45 +0100144if __name__ == "__main__":
145 pass