| # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| # Copyright 2019-2022 Mirantis, Inc. |
| import atexit |
| import grp |
| import os |
| import pwd |
| import time |
| import tempfile |
| |
| _default_time_format = "%Y-%m-%d %H:%M:%S.%f%z" |
| _temp_files = {} |
| |
| |
| def remove_file(filename): |
| os.remove(filename) |
| # open('filename', 'w').close() |
| |
| |
| def write_str_to_file(filename, _str): |
| with open(filename, 'w') as fo: |
| fo.write(_str) |
| |
| |
| def append_str_to_file(filename, _str): |
| with open(filename, 'a') as fa: |
| fa.write(_str) |
| |
| |
| def write_lines_to_file(filename, source_list): |
| with open(filename, 'w') as fw: |
| fw.write("\n".join(source_list) + "\n") |
| |
| |
| def append_lines_to_file(filename, source_list): |
| _buf = "\n".join(source_list) |
| with open(filename, 'a') as fw: |
| fw.write(_buf + "\n") |
| |
| |
| def append_line_to_file(filename, _str): |
| with open(filename, 'a') as fa: |
| fa.write(_str+'\n') |
| |
| |
| def read_file(filename): |
| _buf = None |
| with open(filename, 'rb') as fr: |
| _buf = fr.read() |
| return _buf |
| |
| |
| def read_file_as_lines(filename): |
| _list = [] |
| with open(filename, 'rt') as fr: |
| for line in fr: |
| _list.append(line.rstrip()) |
| return _list |
| |
| |
| def get_file_info_fd(fd, time_format=_default_time_format): |
| |
| def format_time(unixtime): |
| return time.strftime( |
| time_format, |
| time.gmtime(unixtime) |
| ) |
| |
| (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) = \ |
| os.fstat(fd.fileno()) |
| |
| _dict = { |
| 'fd': fd.fileno(), |
| 'mode': oct(mode & 0o777), |
| 'device': hex(dev), |
| 'inode': ino, |
| 'hard_links': nlink, |
| 'owner_id': uid, |
| 'owner_name': pwd.getpwuid(uid).pw_name, |
| 'owner_group_name': grp.getgrgid(gid).gr_name, |
| 'owner_group_id': gid, |
| 'size': size, |
| 'access_time': format_time(atime), |
| 'modification_time': format_time(mtime), |
| 'creation_time': format_time(ctime) |
| } |
| |
| return _dict |
| |
| |
| def get_gzipped_file(url): |
| # imports |
| from io import BytesIO |
| from requests import get |
| import gzip |
| # download a file |
| _bytes = BytesIO(get(url).content) |
| with gzip.GzipFile(fileobj=_bytes) as gz: |
| return gz.read() |
| |
| |
| def ensure_folder_exists(_folder): |
| if not os.path.exists(_folder): |
| # it is not exists, create it |
| os.mkdir(_folder) |
| return "... folder '{}' created".format(_folder) |
| else: |
| return "... folder exists at '{}'".format(_folder) |
| |
| |
| def ensure_folder_removed(_folder): |
| if os.path.exists(_folder): |
| os.rmdir(_folder) |
| return "... folder '{}' removed".format(_folder) |
| else: |
| return "... folder '{}' not exists".format(_folder) |
| |
| |
| def _cleanup_temp_files(): |
| global _temp_files |
| for temp_file in _temp_files.values(): |
| try: |
| os.remove(temp_file) |
| except OSError: |
| pass |
| _temp_files = {} |
| |
| |
| def create_temp_file_with_content(content, mode=None): |
| if len(_temp_files) == 0: |
| atexit.register(_cleanup_temp_files) |
| # Because we may change context several times, try to remember files we |
| # created and reuse them at a small memory cost. |
| content_key = hash(content) |
| if content_key in _temp_files: |
| return _temp_files[content_key] |
| |
| # new file, create it |
| _, name = tempfile.mkstemp() |
| _temp_files[content_key] = name |
| with open(name, 'wb') as fd: |
| fd.write(content.encode() if isinstance(content, str) else content) |
| |
| # set mode for the file |
| if mode: |
| os.chmod(name, mode) |
| |
| return name |