blob: 754f0deac098425767ac41c63dff15dc2189394a [file] [log] [blame]
Alexd9fd85e2019-05-16 16:58:24 -05001import os
2import tarfile as tarfile
3import tempfile
4
5from cfg_checker.common import logger_cli
6from cfg_checker.common.exception import ConfigException
7
8
9class TGZFile(object):
10 basefile = None
Alex6df29ad2019-05-31 17:55:32 -050011 _labelname = "labelfile"
Alexd9fd85e2019-05-16 16:58:24 -050012
13 def __init__(self, _filepath, label=None):
14 # Check if this filename exists
15 if not os.path.exists(_filepath):
16 # If the archive not exists, create it
17 # simple labelfile for a non-empty archive
Alex6df29ad2019-05-31 17:55:32 -050018
19 if not label:
20 label = "MCP Checker TGZ file"
21
Alexd9fd85e2019-05-16 16:58:24 -050022 with tempfile.TemporaryFile() as _tempfile:
23 _tempfile.write(label.encode('utf-8'))
24 _tempfile.flush()
25 _tempfile.seek(0)
26 # create tgz
27 with tarfile.open(_filepath, "w:gz") as tgz:
28 _info = tgz.gettarinfo(
Alex6df29ad2019-05-31 17:55:32 -050029 arcname=self._labelname,
Alexd9fd85e2019-05-16 16:58:24 -050030 fileobj=_tempfile
31 )
32 tgz.addfile(_info, fileobj=_tempfile)
33 logger_cli.debug("... created file '{}'".format(_filepath))
34 self.basefile = _filepath
35
36 elif not os.path.isfile(_filepath):
37 # if path exists, and it is not a file
38 raise ConfigException(
39 "Supplied path of '{}' is not a file".format(
40 _filepath
41 )
42 )
43 elif not tarfile.is_tarfile(_filepath):
44 # if file exists, and it is not a tar file
45 raise ConfigException(
46 "Supplied file of '{}' is not a TAR stream".format(
47 _filepath
48 )
49 )
50 else:
51 self.basefile = _filepath
52
53 def get_file(self, name):
54 if self.has_file(name):
55 with tarfile.open(self.basefile, "r:gz") as tgz:
56 _tgzitem = tgz.extractfile(tgz.getmember(name))
57 return _tgzitem.read()
58 else:
59 return None
60
61 def add_file(self, name, buf=None, replace=False):
62 _files = []
63 with tarfile.open(self.basefile) as r:
64 _files = r.getnames()
65 _exists = name in _files
66 if _exists and not replace:
67 # file exists and replace flag is not set
68 return False
69
70 # check if there is work to do
71 if not buf and not os.path.exists(name):
72 # Nothing to do: no buffer or file to add
73 return False
74 elif name in self.list_files() and not replace:
75 # file already there and replace flag not set
76 return False
77
78 _a = "replace" if replace else "add"
79 logger_cli.debug("... about to {} '{}' ({:.2f}MB) -> '{}'".format(
80 _a,
81 name,
82 float(len(buf))/1024/1024,
83 self.basefile
84 ))
85
86 # unzip tar, add file, zip it back
87 _tmpdir = tempfile.mkdtemp()
88 logger_cli.debug("... created tempdir '{}'".format(_tmpdir))
89 # extract them
90 _files = []
91 with tarfile.open(self.basefile) as r:
92 # all names extracted
93 _files = r.getnames()
94 # extract 'em
95 logger_cli.debug("... extracting contents")
96 r.extractall(_tmpdir)
97
98 # create file
99 if buf:
100 _p = os.path.join(_tmpdir, name)
101 logger_cli.debug("... writing new file to '{}'".format(
102 _p
103 ))
104 if not _exists or replace:
105 with open(_p, "w") as w:
106 w.write(buf)
107 if not _exists:
108 _files.append(name)
109 # create the archive
110 logger_cli.debug("... rebuilding archive")
111 with tarfile.open(self.basefile, "w:gz") as tgz:
112 for _file in _files:
113 _p = os.path.join(_tmpdir, _file)
114 tgz.add(_p, arcname=_file)
115 os.remove(_p)
116 os.rmdir(_tmpdir)
117 return True
118
119 def list_files(self):
120 # get names
121 with tarfile.open(self.basefile, "r:gz") as tgz:
122 _names = tgz.getnames()
123 # filter filenames only, skip path
124 if any(['/' in _n for _n in _names]):
125 _n = []
126 for f in _names:
127 if '/' in f:
128 _n.append(f.rsplit('/', 1)[1])
129 else:
130 _n.append(f)
Alex6df29ad2019-05-31 17:55:32 -0500131 _names = _n
132 # remove label file from output
133 if self._labelname in _names:
134 _names.remove(self._labelname)
135 return _names
Alexd9fd85e2019-05-16 16:58:24 -0500136
137 def has_file(self, name):
138 if name in self.list_files():
139 logger_cli.debug("... '{}' has '{}'".format(self.basefile, name))
140 return True
141 else:
142 logger_cli.debug("... '{}' lacks '{}'".format(self.basefile, name))
143 return False