Basic rsync operations allocated to separated rsync_ops module

Functional tests implemented
PEP8 fixes

Related-Bug: #1570260
Partial-Bug: #1575759
Change-Id: I863658cebeae47e830591a1973ee1832a51e55e7
diff --git a/trsync/objects/rsync_ops.py b/trsync/objects/rsync_ops.py
new file mode 100644
index 0000000..5d4b0e0
--- /dev/null
+++ b/trsync/objects/rsync_ops.py
@@ -0,0 +1,233 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015-2016, Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import re
+
+from trsync.utils import utils as utils
+
+from trsync.objects.rsync_url import RsyncUrl as RsyncUrl
+from trsync.utils.shell import Shell
+from trsync.utils.tempfiles import TempFiles
+
+
+logging.basicConfig()
+log = logging.getLogger(__name__)
+log.setLevel('DEBUG')
+
+
+class RsyncOps(object):
+    def __init__(self, rsync_url, rsync_extra_params=''):
+        # TODO(mrasskazov): retry parameters for rsync
+        self._log = utils.logger.getChild('RsyncOps.' + rsync_url)
+        self._tmp = TempFiles()
+        self._shell = Shell(self._log)
+        self._rsync_extra_params = ' '.join(['-v --no-owner --no-group',
+                                            rsync_extra_params])
+        self.url = RsyncUrl(rsync_url)
+
+    def _pull(self, source='', dest='', opts='', extra=None,
+              no_dry_run=False, raise_error=False):
+        cmd = 'rsync {opts} {allextra} {source.url}'
+        source = RsyncUrl(self.url.urljoin(source))
+        if dest:
+            dest = RsyncUrl(dest)
+            cmd += ' {dest.url}'
+        allextra = self._rsync_extra_params
+        if extra is not None:
+            allextra = ' '.join((allextra, extra))
+        cmd = cmd.format(**(locals()))
+        if no_dry_run:
+            cmd.replace('--dry-run', '')
+        self._log.debug(cmd)
+        return self._shell.shell(cmd, raise_error=raise_error)[1]
+
+    def push(self, source='', dest='', opts='', extra=None):
+        # TODO(mrasskazov): retry for rsync
+        # TODO(mrasskazov): locking:
+        # https://review.openstack.org/#/c/147120/4/utils/simple_http_daemon.py
+        # create lock-files on remotes during operations
+        # symlink dir-timestamp.lock -> dir-timestamp
+        # for reading and writing
+        # special option for ignore lock-files (for manual fixing)
+        # all high-level functions (like ls) specify type of lock(read or
+        # write), and push creates special lock file on remote.
+        # also push uses retry for waiting wnen resource will be
+        # unlocked
+        # TODO(mrasskazov): check for url compatibility
+        # (local->remote, remote->local, local->local)
+        cmd = 'rsync {opts} {allextra} {source.url} {dest.url}'
+        source = RsyncUrl(source)
+        dest = RsyncUrl(self.url.urljoin(dest))
+        allextra = self._rsync_extra_params
+        if extra is not None:
+            allextra = ' '.join((allextra, extra))
+        cmd = cmd.format(**(locals()))
+        self._log.debug(cmd)
+        return self._shell.shell(cmd)[1]
+
+    def _ls(self, path=None, pattern=r'.*', opts=''):
+        extra = '--no-v'
+        try:
+            out = self._pull(source=path, opts=opts, extra=extra,
+                             no_dry_run=True, raise_error=False)
+        except RuntimeError:
+            out = ''
+        pattern = re.compile(pattern)
+        out = [_ for _ in out.splitlines()
+               if (_.split()[-1] != '.') and
+               (pattern.match(_.split()[-1]) is not None)]
+        return out
+
+    def ls(self, path=None, pattern=r'.*'):
+        out = self._ls(path, pattern=pattern)
+        out = [_.split()[-1] for _ in out]
+        return out
+
+    def ls_dirs(self, path=None, pattern=r'.*'):
+        out = self._ls(path, pattern=pattern)
+        out = [_.split()[-1] for _ in out if _.startswith('d')]
+        return out
+
+    def ls_symlinks(self, path=None, pattern=r'.*'):
+        out = self._ls(path, pattern=pattern, opts='-l')
+        out = [_.split()[-3:] for _ in out if _.startswith('l')]
+        out = [[_[0], _[-1]] for _ in out]
+        return out
+
+    def _symlink_abs_target(self, symlink, recursive=True):
+        target = symlink
+        try:
+            path, name = os.path.split(target)
+            target = self.ls_symlinks(symlink)[-1][-1]
+            abs_target = os.path.normpath(os.path.join(path, target))
+            if not recursive:
+                return abs_target
+            return self._symlink_abs_target(abs_target)
+        except Exception:
+            return target
+
+    def symlink_target(self, symlink, recursive=True, absolute=False):
+        if absolute:
+            return self._symlink_abs_target(symlink, recursive=recursive)
+        else:
+            return os.path.relpath(
+                self._symlink_abs_target(symlink, recursive=recursive),
+                os.path.dirname(symlink)
+            )
+
+    def rm_file(self, filename):
+        '''Removes file on rsync_url.'''
+        report_name = filename
+        dirname, filename = os.path.split(filename)
+        dirname = self.url.a_dir(dirname)
+        source = self.url.a_dir(self._tmp.empty_dir)
+        opts = "-r --delete --include={} '--exclude=*'".format(filename)
+        self._log.info('Removing file "{}"'.format(report_name))
+        return self.push(source=source, dest=dirname, opts=opts)
+
+    def rm_all(self, names=[]):
+        '''Remove all files and dirs (recursively)
+
+        on list as single rsync operation
+        '''
+
+        if type(names) not in (list, tuple):
+            if type(names) is str:
+                names = [names]
+            else:
+                raise RuntimeError('rsync_remote.rm_all has wrong parameter '
+                                   '"names" == "{}"'.format(names))
+
+        source = self.url.a_dir(self._tmp.empty_dir)
+
+        # group files by directories
+        dest_dirs = dict()
+        for name in names:
+            dirname, filename = os.path.split(name)
+            if dirname not in dest_dirs.keys():
+                dest_dirs[dirname] = list()
+            dest_dirs[dirname].append(filename)
+
+        for dest_dir, filenames in dest_dirs.items():
+            # prepare filter file for every dest_dir
+            content = ''
+            for filename in filenames:
+                content += '+ {}\n'.format(filename)
+            content += '- *'
+            filter_file = self._tmp.get_file(content=content)
+            # removing specified files on dest_dir
+            self._log.debug('Removing objects on "{}" directory: {}'
+                            ''.format(dest_dir, str(filenames)))
+            opts = "--recursive --delete --filter='merge,p {}'"\
+                   "".format(filter_file)
+            self.push(source=source, dest=dest_dir, opts=opts)
+
+    def clean_dir(self, dirname):
+        '''Removes directories (recursive) on rsync_url'''
+        dirname = self.url.a_dir(dirname)
+        source = self.url.a_dir(self._tmp.empty_dir)
+        opts = "-a --delete"
+        self._log.info('Cleaning directory "{}"'.format(dirname))
+        return self.push(source=source, dest=dirname, opts=opts)
+
+    def rm_dir(self, dirname):
+        '''Removes directories (recursive) on rsync_url'''
+        self._log.info('Removing directory "{}"'.format(dirname))
+        return self.rm_all(self.url.a_file(dirname))
+
+    def mk_dir(self, dirname):
+        '''Creates directories (recirsive, like mkdir -p) on rsync_url'''
+        source = self.url.a_dir(self._tmp.get_temp_dir(dirname))
+        opts = "-r"
+        self._log.info('Creating directory "{}"'.format(dirname))
+        return self.push(source=source, opts=opts)
+
+    def symlink(self, symlink, target,
+                create_target_file=True, store_history=True):
+        '''Creates symlink targeted to target'''
+
+        temp_dir = self._tmp.get_temp_dir()
+        remote_path, symlink = os.path.split(self.url.a_file(symlink))
+        # check that target is exists on remote
+        if not self.ls(os.path.join(remote_path, target)):
+            raise RuntimeError('Target {} does not exists'.format(target))
+        path = os.path.join(temp_dir, remote_path)
+        if not os.path.isdir(path):
+            os.makedirs(path)
+        os.symlink(target, os.path.join(path, symlink))
+
+        if create_target_file is True:
+            infofile = '{}.target.txt'\
+                ''.format(os.path.join(remote_path, symlink))
+            if store_history is True:
+                try:
+                    self._pull(source=infofile, dest=self.url.a_dir(path),
+                               no_dry_run=True, raise_error=False)
+                    with open(os.path.join(temp_dir, infofile), 'r') as inf:
+                        content = '{}\n{}'.format(target, inf.read())
+                except IOError:
+                    content = target
+                with open(os.path.join(temp_dir, infofile), 'w') as outf:
+                    outf.write(content)
+            self._log.debug('Creating informaion file "{}"'.format(infofile))
+
+        opts = "-rl"
+        self._log.info('Creating symlink "{}" -> "{}"'.format(symlink, target))
+        return self.push(source=self.url.a_dir(path),
+                         dest=self.url.a_dir(remote_path),
+                         opts=opts)
diff --git a/trsync/tests/test_rsync_ops.py b/trsync/tests/test_rsync_ops.py
new file mode 100644
index 0000000..34bc189
--- /dev/null
+++ b/trsync/tests/test_rsync_ops.py
@@ -0,0 +1,316 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2015-2016, Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+
+from trsync.objects.rsync_ops import RsyncOps
+from trsync.tests import rsync_base
+from trsync.utils.tempfiles import TempFiles
+
+
+logging.basicConfig()
+log = logging.getLogger(__name__)
+log.setLevel(logging.INFO)
+
+
+class TestRsyncOps(rsync_base.TestRsyncBase):
+
+    """Test case class for rsync_ops module"""
+
+    def test_pull_file(self):
+        for remote in self.rsyncd[self.testname]:
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            # pull it to the temp directory
+            temp_dir = TempFiles()
+            ops = RsyncOps(remote.url)
+            ops._pull('file1.txt', temp_dir.last_temp_dir)
+            # compare the directories
+            self.assertDirsEqual(remote.path, temp_dir.last_temp_dir)
+
+    def test_pull_dir(self):
+        for remote in self.rsyncd[self.testname]:
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path,
+                             'dir1/dir2/dir3/test_data.txt'))
+            # pull it to the temp directory
+            temp_dir = TempFiles()
+            ops = RsyncOps(remote.url)
+            ops._pull('dir1', temp_dir.last_temp_dir, opts='-r')
+            # compare the directories
+            self.assertDirsEqual(remote.path, temp_dir.last_temp_dir)
+
+    def test_push_file(self):
+        for remote in self.rsyncd[self.testname]:
+            # create some data on temp dir
+            temp_dir = TempFiles()
+            filepath = self.getDataFile(
+                os.path.join(temp_dir.last_temp_dir, 'file1.txt')
+            )
+            # push it to the rsync remote
+            ops = RsyncOps(remote.url)
+            ops.push(filepath)
+            # compare the directories
+            self.assertDirsEqual(remote.path, temp_dir.last_temp_dir)
+
+    def test_push_dir(self):
+        for remote in self.rsyncd[self.testname]:
+            # create some data on temp dir
+            temp_dir = TempFiles()
+            self.getDataFile(os.path.join(temp_dir.last_temp_dir,
+                             'dir1/dir2/dir3/test_data.txt'))
+            # push it to the rsync remote
+            ops = RsyncOps(remote.url)
+            ops.push(os.path.join(temp_dir.last_temp_dir, 'dir1'), opts='-r')
+            # compare the directories
+            self.assertDirsEqual(remote.path, temp_dir.last_temp_dir)
+
+    def test_ls(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # compare empty lists
+            self.assertListEqual(ops.ls(), [])
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            os.makedirs(os.path.join(remote.path, 'dir1'))
+            os.symlink('dir1', os.path.join(remote.path, 'symlink1'))
+            # compare the lists
+            self.assertSetEqual(set(ops.ls()),
+                                set(['file1.txt', 'dir1', 'symlink1']))
+
+    def test_ls_dirs(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # compare empty lists
+            self.assertSetEqual(set(ops.ls_dirs()),
+                                set([]))
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            os.makedirs(os.path.join(remote.path, 'dir1'))
+            os.symlink('dir1', os.path.join(remote.path, 'symlink1'))
+            # compare the lists
+            self.assertSetEqual(set(ops.ls_dirs()),
+                                set(['dir1']))
+
+    def test_ls_symlinks(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # compare empty lists
+            self.assertListEqual(ops.ls_symlinks(), [])
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            os.makedirs(os.path.join(remote.path, 'dir1'))
+            os.symlink('dir1', os.path.join(remote.path, 'symlink1'))
+            # compare the lists
+            self.assertListEqual(ops.ls_symlinks(),
+                                 [['symlink1', 'dir1']])
+
+    def test__symlink_abs_target(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on rsync remote
+            os.makedirs(os.path.join(remote.path, 'snapshots/dir1'))
+
+            os.symlink('dir1',
+                       os.path.join(remote.path, 'snapshots/symlink1'))
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink1'),
+                             'snapshots/dir1')
+
+            os.symlink('symlink1',
+                       os.path.join(remote.path, 'snapshots/symlink2'))
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink2',
+                                                     recursive=False),
+                             'snapshots/symlink1')
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink2'),
+                             'snapshots/dir1')
+
+            os.makedirs(os.path.join(remote.path, 'dir2'))
+            os.symlink('../dir2',
+                       os.path.join(remote.path, 'snapshots/symlink3'))
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink3'),
+                             'dir2')
+
+            os.makedirs(os.path.join(remote.path, 'snapshots2/dir3'))
+            os.symlink('../snapshots2/dir3',
+                       os.path.join(remote.path, 'snapshots/symlink4'))
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink4'),
+                             'snapshots2/dir3')
+
+            os.symlink('../snapshots2_dir3',
+                       os.path.join(remote.path, 'snapshots/symlink5'))
+            os.symlink('snapshots2/dir3',
+                       os.path.join(remote.path, 'snapshots2_dir3'))
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink5',
+                                                     recursive=False),
+                             'snapshots2_dir3')
+            self.assertEqual(ops._symlink_abs_target('snapshots/symlink5'),
+                             'snapshots2/dir3')
+
+    def test_symlink_target(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on rsync remote
+            os.makedirs(os.path.join(remote.path, 'snapshots/dir1'))
+
+            os.symlink('dir1',
+                       os.path.join(remote.path, 'snapshots/symlink1'))
+            self.assertEqual(ops.symlink_target('snapshots/symlink1'),
+                             'dir1')
+
+            os.symlink('symlink1',
+                       os.path.join(remote.path, 'snapshots/symlink2'))
+            self.assertEqual(ops.symlink_target('snapshots/symlink2',
+                                                recursive=False),
+                             'symlink1')
+            self.assertEqual(ops.symlink_target('snapshots/symlink2'), 'dir1')
+
+            os.makedirs(os.path.join(remote.path, 'dir2'))
+            os.symlink('../dir2',
+                       os.path.join(remote.path, 'snapshots/symlink3'))
+            self.assertEqual(ops.symlink_target('snapshots/symlink3'),
+                             '../dir2')
+
+            os.makedirs(os.path.join(remote.path, 'snapshots2/dir3'))
+            os.symlink('../snapshots2/dir3',
+                       os.path.join(remote.path, 'snapshots/symlink4'))
+            self.assertEqual(ops.symlink_target('snapshots/symlink4'),
+                             '../snapshots2/dir3')
+
+            os.symlink('../snapshots2_dir3',
+                       os.path.join(remote.path, 'snapshots/symlink5'))
+            os.symlink('snapshots2/dir3',
+                       os.path.join(remote.path, 'snapshots2_dir3'))
+            self.assertEqual(ops.symlink_target('snapshots/symlink5',
+                                                recursive=False),
+                             '../snapshots2_dir3')
+            self.assertEqual(ops.symlink_target('snapshots/symlink5'),
+                             '../snapshots2/dir3')
+
+    def test_rm_file(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            os.makedirs(os.path.join(remote.path, 'dir1'))
+            os.symlink('dir1', os.path.join(remote.path, 'symlink1'))
+            # compare the lists
+            self.assertSetEqual(set(ops.ls()),
+                                set(['file1.txt', 'dir1', 'symlink1']))
+            ops.rm_file('file1.txt')
+            self.assertSetEqual(set(ops.ls()),
+                                set(['dir1', 'symlink1']))
+            ops.rm_file('dir1')
+            self.assertSetEqual(set(ops.ls()),
+                                set(['symlink1']))
+            ops.rm_file('symlink1')
+            self.assertSetEqual(set(ops.ls()),
+                                set([]))
+
+    def test_rm_all(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            os.makedirs(os.path.join(remote.path, 'dir1/dir2/dir3'))
+            self.getDataFile(os.path.join(remote.path, 'dir2/file1.txt'))
+            os.symlink('dir1', os.path.join(remote.path, 'symlink1'))
+            # compare the lists
+            self.assertSetEqual(set(ops.ls()),
+                                set(['file1.txt', 'dir1', 'dir2', 'symlink1']))
+            ops.rm_all(['file1.txt', 'dir1', 'symlink1'])
+            self.assertSetEqual(set(ops.ls()), set(['dir2']))
+
+    def test_clean_dir(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on rsync remote
+            os.makedirs(os.path.join(remote.path, 'dir1/dir2/dir3'))
+            self.getDataFile(os.path.join(remote.path, 'dir2/file1.txt'))
+            os.symlink('dir1', os.path.join(remote.path, 'dir2/symlink1'))
+            ops.clean_dir('dir1')
+            # compare the directories
+            self.assertSetEqual(set(ops.ls('dir1/')), set([]))
+            ops.clean_dir('dir2')
+            # compare the directories
+            self.assertSetEqual(set(ops.ls('dir2/')), set([]))
+
+    def test_rm_dir(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on rsync remote
+            self.getDataFile(os.path.join(remote.path, 'file1.txt'))
+            os.makedirs(os.path.join(remote.path, 'dir1/dir2/dir3'))
+            self.getDataFile(os.path.join(remote.path, 'dir2/file1.txt'))
+            os.symlink('dir1', os.path.join(remote.path, 'symlink1'))
+            # compare the lists
+            self.assertSetEqual(set(ops.ls()),
+                                set(['file1.txt', 'dir1', 'dir2', 'symlink1']))
+            ops.rm_dir('dir1')
+            self.assertSetEqual(set(ops.ls()),
+                                set(['file1.txt', 'dir2', 'symlink1']))
+
+    def test_mk_dir(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create some data on temp dir
+            temp_dir = TempFiles()
+            os.makedirs(os.path.join(temp_dir.last_temp_dir, 'dir1'))
+            # push it to the rsync remote
+            ops.mk_dir('dir1')
+            # compare the directories
+            self.assertDirsEqual(remote.path, temp_dir.last_temp_dir)
+            # recursive dir
+            ops.mk_dir('snapshots/dir1')
+            self.assertDirsEqual(os.path.join(remote.path, 'snapshots/'),
+                                 temp_dir.last_temp_dir)
+
+    def test_symlink(self):
+        for remote in self.rsyncd[self.testname]:
+            ops = RsyncOps(remote.url)
+            # create symlink with existent target
+            os.makedirs(os.path.join(remote.path, 'snapshots/dir1'))
+            ops.symlink('snapshots/symlink1', 'dir1')
+            self.assertSetEqual(
+                set(ops.ls('snapshots/')),
+                set(['dir1', 'symlink1', 'symlink1.target.txt'])
+            )
+            self.assertEqual(ops.symlink_target('snapshots/symlink1'), 'dir1')
+            with open(os.path.join(remote.path,
+                                   'snapshots/symlink1') + '.target.txt') \
+                    as target_file:
+                self.assertEqual(['dir1'],
+                                 target_file.read().splitlines())
+
+            # create symlink with absent target
+            self.assertRaises(RuntimeError,
+                              ops.symlink, 'snapshots/symlink2', 'dir2')
+
+            # update existent symlink
+            os.makedirs(os.path.join(remote.path, 'snapshots/dir2'))
+            ops.symlink('snapshots/symlink1', 'dir2')
+            self.assertSetEqual(
+                set(ops.ls('snapshots/')),
+                set(['dir1', 'dir2', 'symlink1', 'symlink1.target.txt']))
+            self.assertEqual(ops.symlink_target('snapshots/symlink1'), 'dir2')
+            with open(os.path.join(remote.path, 'snapshots/symlink1') + '.target.txt') \
+                    as target_file:
+                self.assertEqual(['dir2', 'dir1'],
+                                 target_file.read().splitlines())
+
+            # update symlink with absent target
+            self.assertRaises(RuntimeError,
+                              ops.symlink, 'snapshots/symlink1', 'dir3')