blob: ba4ef0485e79e42a830003f016a4a9be33e80af3 [file] [log] [blame]
Max Rasskazov26787df2015-06-05 14:47:27 +03001#-*- coding: utf-8 -*-
2
3import datetime
4import os
5
6import utils
7
8from rsync_remote import RsyncRemote
9from utils import singleton
10
11
12@singleton
13class TimeStamp(object):
14 def __init__(self):
15 self.now = datetime.datetime.utcnow()
16 self.staging_snapshot_stamp_format = r'%Y-%m-%d-%H%M%S'
17 self.staging_snapshot_stamp_regexp = \
18 r'[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{6}'
19 self.staging_snapshot_stamp = \
20 self.now.strftime(self.staging_snapshot_stamp_format)
21
22 def __str__(self):
23 return self.staging_snapshot_stamp
24
25
Max Rasskazov3e837582015-06-17 18:44:15 +030026class TRsync(RsyncRemote):
Max Rasskazov26787df2015-06-05 14:47:27 +030027 # retry and other function with mirror
28 # add all the needed directory functions here, like mkdir, ls, rm etc
29 # possible check that rsync url is exists
30 def __init__(self,
31 rsync_url,
Max Rasskazov01271622015-06-17 02:35:09 +030032 snapshot_dir='snapshots',
33 latest_successful_postfix='latest',
Max Rasskazov26787df2015-06-05 14:47:27 +030034 save_latest_days=14,
35 init_directory_structure=True,
36 ):
Max Rasskazov3e837582015-06-17 18:44:15 +030037 super(TRsync, self).__init__(rsync_url)
38 self.logger = utils.logger.getChild('TRsync.' + rsync_url)
Max Rasskazov26787df2015-06-05 14:47:27 +030039 self.timestamp = TimeStamp()
40 self.logger.info('Using timestamp {}'.format(self.timestamp))
Max Rasskazov46cc0732015-06-05 19:23:24 +030041 self.snapshot_dir = self.url.a_dir(snapshot_dir)
Max Rasskazov26787df2015-06-05 14:47:27 +030042 self.latest_successful_postfix = latest_successful_postfix
43 self.save_latest_days = save_latest_days
44
Max Rasskazov26787df2015-06-05 14:47:27 +030045 if init_directory_structure is True:
46 self.init_directory_structure()
47
48 def init_directory_structure(self):
49 # TODO: self.rsyncRemote.mkdir
Max Rasskazov01271622015-06-17 02:35:09 +030050 if self.url.url_type != 'path':
51 server_root = RsyncRemote(self.url.root)
52 return server_root.mkdir(
53 self.url.a_dir(self.url.path, self.snapshot_dir)
54 )
Max Rasskazov26787df2015-06-05 14:47:27 +030055
56 def push(self, source, repo_name, extra=None):
Max Rasskazov46cc0732015-06-05 19:23:24 +030057 latest_path = self.url.a_file(
Max Rasskazov26787df2015-06-05 14:47:27 +030058 self.snapshot_dir,
Max Rasskazov46cc0732015-06-05 19:23:24 +030059 '{}-{}'.format(self.url.a_file(repo_name),
Max Rasskazov26787df2015-06-05 14:47:27 +030060 self.latest_successful_postfix)
61 )
Max Rasskazov46cc0732015-06-05 19:23:24 +030062 snapshot_name = self.url.a_file(
63 '{}-{}'.format(self.url.a_file(repo_name), self.timestamp)
Max Rasskazov26787df2015-06-05 14:47:27 +030064 )
Max Rasskazov46cc0732015-06-05 19:23:24 +030065 repo_path = self.url.a_file(self.snapshot_dir, snapshot_name)
Max Rasskazov26787df2015-06-05 14:47:27 +030066
Max Rasskazov854399e2015-06-05 16:35:17 +030067 extra = '--link-dest={}'.format(
Max Rasskazov46cc0732015-06-05 19:23:24 +030068 self.url.a_file(self.url.path, latest_path)
Max Rasskazov854399e2015-06-05 16:35:17 +030069 )
Max Rasskazova5911852015-06-17 18:29:58 +030070 # TODO: retry on base class!!!!!!!!!!!!!!!
71 # TODO: locking - symlink dir-timestamp.lock -> dir-timestamp
72 # TODO: write yaml file with symlink info
73 transaction = list()
74 try:
75 # start transaction
Max Rasskazov3e837582015-06-17 18:44:15 +030076 result = super(TRsync, self).push(source, repo_path, extra)
Max Rasskazova5911852015-06-17 18:29:58 +030077 transaction.append('repo_dir_created')
78 self.logger.info('{}'.format(result))
79
80 try:
81 old_repo_name_symlink_target = \
82 [_[1] for _ in self.ls_symlinks(repo_name)][0]
83 self.logger.info('Previous {} -> {}'
84 ''.format(repo_name,
85 old_repo_name_symlink_target))
86 status = 'updated'
87 except:
88 status = 'created'
89 self.symlink(repo_name, repo_path)
90 transaction.append('symlink_repo_name_{}'.format(status))
91
92 try:
93 old_latest_path_symlink_target = \
94 [_[1] for _ in self.ls_symlinks(latest_path)][0]
95 self.logger.info('Previous {} -> {}'
96 ''.format(latest_path,
97 old_latest_path_symlink_target))
98 status = 'updated'
99 except:
100 status = 'created'
101 self.symlink(latest_path, snapshot_name)
102 transaction.append('symlink_latest_path_{}'.format(status))
103
104 self._remove_old_snapshots(repo_name)
105 transaction.append('old_snapshots_deleted')
106
107 except RuntimeError as e:
108 #self.logger.error(e.message)
109 # deleting of old snapshots ignored when assessing the transaction
110 # only warning
111 if 'old_snapshots_deleted' not in transaction:
112 self.logger.warn("Old snapshots are not deleted. Ignore. "
113 "May be next time.")
114 transaction.append('old_snapshots_deleted')
115
116 if len(transaction) < 4:
117 # rollback transaction if some of sync operations failed
118
119 if 'symlink_latest_path_updated' in transaction:
120 self.logger.info('Restoring symlink {} -> {}'
121 ''.format(latest_path,
122 old_latest_path_symlink_target))
123 self.symlink(latest_path, old_latest_path_symlink_target)
124 elif 'symlink_latest_path_created' in transaction:
125 self.logger.info('Deleting symlink {}'.format(latest_path))
126 self.rmfile(latest_path)
127
128 if 'symlink_repo_name_updated' in transaction:
129 self.logger.info('Restoring symlink {} -> {}'
130 ''.format(repo_name,
131 old_repo_name_symlink_target))
132 self.symlink(repo_name, old_repo_name_symlink_target)
133 elif 'symlink_repo_name_created' in transaction:
134 self.logger.info('Deleting symlink {}'.format(repo_name))
135 self.rmfile(repo_name)
136
137 if 'repo_dir_created' in transaction:
138 self.logger.info('Removing snapshot {}'.format(repo_path))
139 self.rmdir(repo_path)
140 raise
141
Max Rasskazov26787df2015-06-05 14:47:27 +0300142 return result
Max Rasskazov452138b2015-06-17 02:37:34 +0300143
144 def _remove_old_snapshots(self, repo_name, save_latest_days=None):
145 if save_latest_days is None:
146 save_latest_days = self.save_latest_days
147 if save_latest_days is None or save_latest_days is False:
148 # delete all snapshots
149 self.logger.info('Deletion all of the old snapshots '
150 '(save_latest_days == {})'
151 ''.format(save_latest_days))
152 save_latest_days = -1
153 elif save_latest_days == 0:
154 # skipping deletion
155 self.logger.info('Skip deletion of old snapshots '
156 '(save_latest_days == {})'
157 ''.format(save_latest_days))
158 return
159 else:
160 # delete snapshots older than
161 self.logger.info('Deletion all of the unlinked snapshots older '
162 'than {0} days (save_latest_days == {0})'
163 ''.format(save_latest_days))
164 warn_date = \
165 self.timestamp.now - datetime.timedelta(days=save_latest_days)
166 warn_date = datetime.datetime.combine(warn_date, datetime.time(0))
167 snapshots = self.ls_dirs(
168 self.url.a_dir(self.snapshot_dir),
169 pattern=r'^{}-{}$'.format(
170 repo_name,
171 self.timestamp.staging_snapshot_stamp_regexp
172 )
173 )
174 links = self.ls_symlinks(self.url.a_dir())
175 links += self.ls_symlinks(self.url.a_dir(self.snapshot_dir))
176 for s in snapshots:
177 s_date = datetime.datetime.strptime(
178 s,
179 '{}-{}'.format(repo_name,
180 self.timestamp.staging_snapshot_stamp_format)
181 )
182 s_date = datetime.datetime.combine(s_date, datetime.time(0))
183 s_path = self.url.a_dir(self.snapshot_dir, s)
184 if s_date < warn_date:
185 s_links = [_[0] for _ in links
186 if _[1] == s
187 or _[1].endswith('/{}'.format(s))
188 ]
189 if not s_links:
190 self.rmdir(s_path)
191 else:
192 self.logger.info('Skip deletion of "{}" because there are '
193 'symlinks found: {}'.format(s, s_links))
194 else:
195 self.logger.info('Skip deletion of "{}" because it newer than '
196 '{} days'.format(s, save_latest_days))