blob: 5be00e25dec176b8a6bdc0ba77e5982227827fdc [file] [log] [blame]
Alexd9fd85e2019-05-16 16:58:24 -05001import os
2import tarfile as tarfile
3import tempfile
4
5from cfg_checker.common import logger_cli
6from cfg_checker.common.exception import ConfigException
7
8
9class TGZFile(object):
10 basefile = None
Alex6df29ad2019-05-31 17:55:32 -050011 _labelname = "labelfile"
Alexd9fd85e2019-05-16 16:58:24 -050012
13 def __init__(self, _filepath, label=None):
14 # Check if this filename exists
15 if not os.path.exists(_filepath):
16 # If the archive not exists, create it
17 # simple labelfile for a non-empty archive
Alex6df29ad2019-05-31 17:55:32 -050018
19 if not label:
20 label = "MCP Checker TGZ file"
21
Alexd9fd85e2019-05-16 16:58:24 -050022 with tempfile.TemporaryFile() as _tempfile:
23 _tempfile.write(label.encode('utf-8'))
24 _tempfile.flush()
25 _tempfile.seek(0)
26 # create tgz
27 with tarfile.open(_filepath, "w:gz") as tgz:
28 _info = tgz.gettarinfo(
Alex6df29ad2019-05-31 17:55:32 -050029 arcname=self._labelname,
Alexd9fd85e2019-05-16 16:58:24 -050030 fileobj=_tempfile
31 )
32 tgz.addfile(_info, fileobj=_tempfile)
33 logger_cli.debug("... created file '{}'".format(_filepath))
34 self.basefile = _filepath
35
36 elif not os.path.isfile(_filepath):
37 # if path exists, and it is not a file
38 raise ConfigException(
39 "Supplied path of '{}' is not a file".format(
40 _filepath
41 )
42 )
43 elif not tarfile.is_tarfile(_filepath):
44 # if file exists, and it is not a tar file
45 raise ConfigException(
46 "Supplied file of '{}' is not a TAR stream".format(
47 _filepath
48 )
49 )
50 else:
51 self.basefile = _filepath
52
Alex3bc95f62020-03-05 17:00:04 -060053 def get_file(self, name, decode=False):
Alexd9fd85e2019-05-16 16:58:24 -050054 if self.has_file(name):
55 with tarfile.open(self.basefile, "r:gz") as tgz:
56 _tgzitem = tgz.extractfile(tgz.getmember(name))
Alex3bc95f62020-03-05 17:00:04 -060057 if decode:
58 return _tgzitem.read().decode('utf-8')
59 else:
60 return _tgzitem.read()
Alexd9fd85e2019-05-16 16:58:24 -050061 else:
62 return None
63
64 def add_file(self, name, buf=None, replace=False):
65 _files = []
66 with tarfile.open(self.basefile) as r:
67 _files = r.getnames()
68 _exists = name in _files
69 if _exists and not replace:
70 # file exists and replace flag is not set
71 return False
72
73 # check if there is work to do
74 if not buf and not os.path.exists(name):
75 # Nothing to do: no buffer or file to add
76 return False
77 elif name in self.list_files() and not replace:
78 # file already there and replace flag not set
79 return False
80
81 _a = "replace" if replace else "add"
82 logger_cli.debug("... about to {} '{}' ({:.2f}MB) -> '{}'".format(
83 _a,
84 name,
85 float(len(buf))/1024/1024,
86 self.basefile
87 ))
88
89 # unzip tar, add file, zip it back
90 _tmpdir = tempfile.mkdtemp()
91 logger_cli.debug("... created tempdir '{}'".format(_tmpdir))
92 # extract them
93 _files = []
94 with tarfile.open(self.basefile) as r:
95 # all names extracted
96 _files = r.getnames()
97 # extract 'em
98 logger_cli.debug("... extracting contents")
99 r.extractall(_tmpdir)
100
101 # create file
102 if buf:
103 _p = os.path.join(_tmpdir, name)
104 logger_cli.debug("... writing new file to '{}'".format(
105 _p
106 ))
107 if not _exists or replace:
108 with open(_p, "w") as w:
109 w.write(buf)
110 if not _exists:
111 _files.append(name)
112 # create the archive
113 logger_cli.debug("... rebuilding archive")
114 with tarfile.open(self.basefile, "w:gz") as tgz:
115 for _file in _files:
116 _p = os.path.join(_tmpdir, _file)
117 tgz.add(_p, arcname=_file)
118 os.remove(_p)
119 os.rmdir(_tmpdir)
120 return True
121
122 def list_files(self):
123 # get names
124 with tarfile.open(self.basefile, "r:gz") as tgz:
125 _names = tgz.getnames()
126 # filter filenames only, skip path
127 if any(['/' in _n for _n in _names]):
128 _n = []
129 for f in _names:
130 if '/' in f:
131 _n.append(f.rsplit('/', 1)[1])
132 else:
133 _n.append(f)
Alex6df29ad2019-05-31 17:55:32 -0500134 _names = _n
135 # remove label file from output
136 if self._labelname in _names:
137 _names.remove(self._labelname)
138 return _names
Alexd9fd85e2019-05-16 16:58:24 -0500139
140 def has_file(self, name):
141 if name in self.list_files():
142 logger_cli.debug("... '{}' has '{}'".format(self.basefile, name))
143 return True
144 else:
145 logger_cli.debug("... '{}' lacks '{}'".format(self.basefile, name))
146 return False