blob: 8b1652a4f3bcebb8142d978293dea1c51bd3e862 [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
import atexit
import grp
import os
import pwd
import time
import tempfile
_default_time_format = "%Y-%m-%d %H:%M:%S.%f%z"
_temp_files = {}
def remove_file(filename):
os.remove(filename)
# open('filename', 'w').close()
def write_str_to_file(filename, _str):
with open(filename, 'w') as fo:
fo.write(_str)
def append_str_to_file(filename, _str):
with open(filename, 'a') as fa:
fa.write(_str)
def write_lines_to_file(filename, source_list):
with open(filename, 'w') as fw:
fw.write("\n".join(source_list) + "\n")
def append_lines_to_file(filename, source_list):
_buf = "\n".join(source_list)
with open(filename, 'a') as fw:
fw.write(_buf + "\n")
def append_line_to_file(filename, _str):
with open(filename, 'a') as fa:
fa.write(_str+'\n')
def read_file(filename):
_buf = None
with open(filename, 'rb') as fr:
_buf = fr.read()
return _buf
def read_file_as_lines(filename):
_list = []
with open(filename, 'rt') as fr:
for line in fr:
_list.append(line.rstrip())
return _list
def get_file_info_fd(fd, time_format=_default_time_format):
def format_time(unixtime):
return time.strftime(
time_format,
time.gmtime(unixtime)
)
(mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) = \
os.fstat(fd.fileno())
_dict = {
'fd': fd.fileno(),
'mode': oct(mode & 0o777),
'device': hex(dev),
'inode': ino,
'hard_links': nlink,
'owner_id': uid,
'owner_name': pwd.getpwuid(uid).pw_name,
'owner_group_name': grp.getgrgid(gid).gr_name,
'owner_group_id': gid,
'size': size,
'access_time': format_time(atime),
'modification_time': format_time(mtime),
'creation_time': format_time(ctime)
}
return _dict
def get_gzipped_file(url):
# imports
from io import BytesIO
from requests import get
import gzip
# download a file
_bytes = BytesIO(get(url).content)
with gzip.GzipFile(fileobj=_bytes) as gz:
return gz.read()
def ensure_folder_exists(_folder):
if not os.path.exists(_folder):
# it is not exists, create it
os.mkdir(_folder)
return "... folder '{}' created".format(_folder)
else:
return "... folder exists at '{}'".format(_folder)
def ensure_folder_removed(_folder):
if os.path.exists(_folder):
os.rmdir(_folder)
return "... folder '{}' removed".format(_folder)
else:
return "... folder '{}' not exists".format(_folder)
def _cleanup_temp_files():
global _temp_files
for temp_file in _temp_files.values():
try:
os.remove(temp_file)
except OSError:
pass
_temp_files = {}
def create_temp_file_with_content(content, mode=None):
if len(_temp_files) == 0:
atexit.register(_cleanup_temp_files)
# Because we may change context several times, try to remember files we
# created and reuse them at a small memory cost.
content_key = hash(content)
if content_key in _temp_files:
return _temp_files[content_key]
# new file, create it
_, name = tempfile.mkstemp()
_temp_files[content_key] = name
with open(name, 'wb') as fd:
fd.write(content.encode() if isinstance(content, str) else content)
# set mode for the file
if mode:
os.chmod(name, mode)
return name