blob: 23b28bbe1123ed4b380782d364a7eb6088fb9b6b [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alexd9fd85e2019-05-16 16:58:24 -05003import os
4import tarfile as tarfile
5import tempfile
6
7from cfg_checker.common import logger_cli
8from cfg_checker.common.exception import ConfigException
9
10
11class TGZFile(object):
12 basefile = None
Alex6df29ad2019-05-31 17:55:32 -050013 _labelname = "labelfile"
Alexd9fd85e2019-05-16 16:58:24 -050014
15 def __init__(self, _filepath, label=None):
16 # Check if this filename exists
17 if not os.path.exists(_filepath):
18 # If the archive not exists, create it
19 # simple labelfile for a non-empty archive
Alex6df29ad2019-05-31 17:55:32 -050020
21 if not label:
22 label = "MCP Checker TGZ file"
23
Alexd9fd85e2019-05-16 16:58:24 -050024 with tempfile.TemporaryFile() as _tempfile:
25 _tempfile.write(label.encode('utf-8'))
26 _tempfile.flush()
27 _tempfile.seek(0)
28 # create tgz
29 with tarfile.open(_filepath, "w:gz") as tgz:
30 _info = tgz.gettarinfo(
Alex6df29ad2019-05-31 17:55:32 -050031 arcname=self._labelname,
Alexd9fd85e2019-05-16 16:58:24 -050032 fileobj=_tempfile
33 )
34 tgz.addfile(_info, fileobj=_tempfile)
35 logger_cli.debug("... created file '{}'".format(_filepath))
36 self.basefile = _filepath
37
38 elif not os.path.isfile(_filepath):
39 # if path exists, and it is not a file
40 raise ConfigException(
41 "Supplied path of '{}' is not a file".format(
42 _filepath
43 )
44 )
45 elif not tarfile.is_tarfile(_filepath):
46 # if file exists, and it is not a tar file
47 raise ConfigException(
48 "Supplied file of '{}' is not a TAR stream".format(
49 _filepath
50 )
51 )
52 else:
53 self.basefile = _filepath
54
Alex3bc95f62020-03-05 17:00:04 -060055 def get_file(self, name, decode=False):
Alexd9fd85e2019-05-16 16:58:24 -050056 if self.has_file(name):
57 with tarfile.open(self.basefile, "r:gz") as tgz:
58 _tgzitem = tgz.extractfile(tgz.getmember(name))
Alex3bc95f62020-03-05 17:00:04 -060059 if decode:
Alexe8643642021-08-23 14:08:46 -050060 return _tgzitem.read().decode('utf-8')
Alex3bc95f62020-03-05 17:00:04 -060061 else:
62 return _tgzitem.read()
Alexd9fd85e2019-05-16 16:58:24 -050063 else:
64 return None
65
66 def add_file(self, name, buf=None, replace=False):
67 _files = []
68 with tarfile.open(self.basefile) as r:
69 _files = r.getnames()
70 _exists = name in _files
71 if _exists and not replace:
72 # file exists and replace flag is not set
73 return False
74
75 # check if there is work to do
76 if not buf and not os.path.exists(name):
77 # Nothing to do: no buffer or file to add
78 return False
79 elif name in self.list_files() and not replace:
80 # file already there and replace flag not set
81 return False
82
83 _a = "replace" if replace else "add"
84 logger_cli.debug("... about to {} '{}' ({:.2f}MB) -> '{}'".format(
85 _a,
86 name,
87 float(len(buf))/1024/1024,
88 self.basefile
89 ))
90
91 # unzip tar, add file, zip it back
92 _tmpdir = tempfile.mkdtemp()
93 logger_cli.debug("... created tempdir '{}'".format(_tmpdir))
94 # extract them
95 _files = []
96 with tarfile.open(self.basefile) as r:
97 # all names extracted
98 _files = r.getnames()
99 # extract 'em
100 logger_cli.debug("... extracting contents")
101 r.extractall(_tmpdir)
102
103 # create file
104 if buf:
105 _p = os.path.join(_tmpdir, name)
106 logger_cli.debug("... writing new file to '{}'".format(
107 _p
108 ))
109 if not _exists or replace:
110 with open(_p, "w") as w:
111 w.write(buf)
112 if not _exists:
113 _files.append(name)
114 # create the archive
115 logger_cli.debug("... rebuilding archive")
116 with tarfile.open(self.basefile, "w:gz") as tgz:
117 for _file in _files:
118 _p = os.path.join(_tmpdir, _file)
119 tgz.add(_p, arcname=_file)
120 os.remove(_p)
121 os.rmdir(_tmpdir)
122 return True
123
124 def list_files(self):
125 # get names
126 with tarfile.open(self.basefile, "r:gz") as tgz:
127 _names = tgz.getnames()
128 # filter filenames only, skip path
129 if any(['/' in _n for _n in _names]):
130 _n = []
131 for f in _names:
132 if '/' in f:
133 _n.append(f.rsplit('/', 1)[1])
134 else:
135 _n.append(f)
Alex6df29ad2019-05-31 17:55:32 -0500136 _names = _n
137 # remove label file from output
138 if self._labelname in _names:
139 _names.remove(self._labelname)
140 return _names
Alexd9fd85e2019-05-16 16:58:24 -0500141
142 def has_file(self, name):
143 if name in self.list_files():
144 logger_cli.debug("... '{}' has '{}'".format(self.basefile, name))
145 return True
146 else:
147 logger_cli.debug("... '{}' lacks '{}'".format(self.basefile, name))
148 return False