fixeg code
diff --git a/wally/ceph.py b/wally/ceph.py
index e23343e..1e79126 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -85,7 +85,7 @@
             if key is None:
                 key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
 
-            with setup_rpc(connect(info), ctx.rpc_code) as node:
+            with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins) as node:
 
                 # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
                 ssh_key = node.get_file_content("~/.ssh/id_rsa")
diff --git a/wally/fuel.py b/wally/fuel.py
index 040dcf4..1680d29 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -77,7 +77,7 @@
             if discover_nodes:
 
                 try:
-                    fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
+                    fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code, ctx.default_rpc_plugins)
                 except AuthenticationException:
                     raise StopTestError("Wrong fuel credentials")
                 except Exception:
diff --git a/wally/main.py b/wally/main.py
index 14da140..9a453fb 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -1,5 +1,4 @@
 import os
-import sys
 import time
 import signal
 import logging
@@ -28,8 +27,9 @@
 except ImportError:
     faulthandler = None
 
+import agent
 
-from . import utils, run_test, pretty_yaml
+from . import utils, node
 from .storage import make_storage, Storage
 from .config import Config
 from .logger import setup_loggers
@@ -125,13 +125,13 @@
     test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
                              help="Don't connect/discover fuel nodes")
     test_parser.add_argument('--no-report', action='store_true', help="Skip report stages")
-    test_parser.add_argument('-r', '--resume', default=None, help="Resume previously stopped test, stored in DIR",
-                             metavar="DIR")
     test_parser.add_argument('--result-dir', default=None, help="Save results to DIR", metavart="DIR")
     test_parser.add_argument("comment", help="Test information")
     test_parser.add_argument("config_file", help="Yaml config file", nargs='?', default=None)
 
     # ---------------------------------------------------------------------
+    test_parser = subparsers.add_parser('resume', help='resume tests')
+    test_parser.add_argument("storage_dir", help="Path to test directory")
 
     return parser.parse_args(argv[1:])
 
@@ -152,7 +152,6 @@
         faulthandler.register(signal.SIGUSR1, all_threads=True)
 
     opts = parse_args(argv)
-
     stages = []  # type: List[Stage]
 
     # stop mypy from telling that config & storage might be undeclared
@@ -160,28 +159,23 @@
     storage = None  # type: Storage
 
     if opts.subparser_name == 'test':
-        if opts.resume:
-            storage = make_storage(opts.resume, existing=True)
-            config = storage.load(Config, 'config')
-        else:
-            file_name = os.path.abspath(opts.config_file)
-            with open(file_name) as fd:
-                config = Config(yaml_load(fd.read()))  # type: ignore
+        file_name = os.path.abspath(opts.config_file)
+        with open(file_name) as fd:
+            config = Config(yaml_load(fd.read()))  # type: ignore
 
-            config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
-            config.comment = opts.comment
-            config.keep_vm = opts.keep_vm
-            config.no_tests = opts.no_tests
-            config.dont_discover_nodes = opts.dont_discover_nodes
-            config.build_id = opts.build_id
-            config.build_description = opts.build_description
-            config.build_type = opts.build_type
-            config.settings_dir = get_config_path(config, opts.settings_dir)
+        config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
+        config.comment = opts.comment
+        config.keep_vm = opts.keep_vm
+        config.no_tests = opts.no_tests
+        config.dont_discover_nodes = opts.dont_discover_nodes
+        config.build_id = opts.build_id
+        config.build_description = opts.build_description
+        config.build_type = opts.build_type
+        config.settings_dir = get_config_path(config, opts.settings_dir)
 
-            storage = make_storage(config.storage_url)
+        storage = make_storage(config.storage_url)
 
-            storage['config'] = config  # type: ignore
-
+        storage['config'] = config  # type: ignore
 
         stages.append(DiscoverCephStage)  # type: ignore
         stages.append(DiscoverOSStage)  # type: ignore
@@ -195,6 +189,14 @@
         if not opts.dont_collect:
             stages.append(CollectInfoStage)   # type: ignore
 
+        storage['cli'] = argv
+
+    elif opts.subparser_name == 'resume':
+        storage = make_storage(opts.storage_dir, existing=True)
+        config = storage.load(Config, 'config')
+        # TODO: fix this
+        raise NotImplementedError("Resume in not fully implemented")
+
     elif opts.subparser_name == 'ls':
         tab = texttable.Texttable(max_width=200)
         tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
@@ -230,6 +232,7 @@
     logger.info("All info would be stored into %r", config.storage_url)
 
     ctx = TestRun(config, storage)
+    ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
 
     stages.sort(key=lambda x: x.priority)
 
@@ -237,8 +240,12 @@
     failed = False
     cleanup_stages = []
     for stage in stages:
+        if stage.config_block is not None:
+            if stage.config_block not in ctx.config:
+                continue
+
+        cleanup_stages.append(stage)
         try:
-            cleanup_stages.append(stage)
             with log_stage(stage):
                 stage.run(ctx)
         except:
diff --git a/wally/node.py b/wally/node.py
index fae7879..54a6291 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -4,7 +4,7 @@
 import socket
 import logging
 import subprocess
-from typing import Union, cast, Any
+from typing import Union, cast, Any, Optional, Tuple, Dict, List
 
 
 import agent
@@ -26,11 +26,16 @@
     def __str__(self) -> str:
         return self.info.node_id()
 
-    def put_to_file(self, path: str, content: bytes) -> None:
+    def put_to_file(self, path: Optional[str], content: bytes) -> str:
+        if path is None:
+            path = self.run("mktemp").strip()
+
         with self.conn.open_sftp() as sftp:
             with sftp.open(path, "wb") as fd:
                 fd.write(content)
 
+        return path
+
     def disconnect(self):
         self.conn.close()
 
@@ -53,10 +58,10 @@
 
             while True:
                 try:
-                    ndata = session.recv(1024)
-                    output += ndata
-                    if "" == ndata:
+                    ndata = session.recv(1024).decode("utf-8")
+                    if not ndata:
                         break
+                    output += ndata
                 except socket.timeout:
                     pass
 
@@ -97,7 +102,9 @@
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT)
 
-        stdout_data, _ = proc.communicate()
+        stdout_data_b, _ = proc.communicate()
+        stdout_data = stdout_data_b.decode("utf8")
+
         if proc.returncode != 0:
             templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
             raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
@@ -108,6 +115,25 @@
         pass
 
 
+def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
+    # setup rpc data
+    if agent.__file__.endswith(".pyc"):
+        path = agent.__file__[:-1]
+    else:
+        path = agent.__file__
+
+    master_code = open(path, "rb").read()
+
+    plugins = {}  # type: Dict[str, bytes]
+    cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
+    plugins["cli"] = open(cli_path, "rb").read()
+
+    fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
+    plugins["fs"] = open(fs_path, "rb").read()
+
+    return master_code, plugins
+
+
 def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
     if info == 'local':
         return LocalHost()
@@ -119,12 +145,12 @@
 class RPCNode(IRPCNode):
     """Node object"""
 
-    def __init__(self, conn: agent.Client, info: NodeInfo) -> None:
+    def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
         self.info = info
         self.conn = conn
 
     def __str__(self) -> str:
-        return "<Node: url={!s} roles={!r} hops=/>".format(self.info.ssh_creds, ",".join(self.info.roles))
+        return "Node(url={!r}, roles={!r})".format(self.info.ssh_creds, ",".join(self.info.roles))
 
     def __repr__(self) -> str:
         return str(self)
@@ -138,7 +164,7 @@
     def copy_file(self, local_path: str, remote_path: str = None) -> str:
         raise NotImplementedError()
 
-    def put_to_file(self, path: str, content: bytes) -> None:
+    def put_to_file(self, path: Optional[str], content: bytes) -> str:
         raise NotImplementedError()
 
     def get_interface(self, ip: str) -> str:
@@ -148,27 +174,35 @@
         raise NotImplementedError()
 
     def disconnect(self) -> str:
-        raise NotImplementedError()
+        self.conn.disconnect()
+        self.conn = None
 
 
-def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
-    code_file = node.run("mktemp").strip()
+def setup_rpc(node: ISSHHost, rpc_server_code: bytes, plugins: Dict[str, bytes] = None, port: int = 0) -> IRPCNode:
     log_file = node.run("mktemp").strip()
-    node.put_to_file(code_file, rpc_server_code)
-    cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
-          "--show-settings --stdout-file={out_file}"
-
+    code_file = node.put_to_file(None, rpc_server_code)
     ip = node.info.ssh_creds.addr.host
 
-    params_js = node.run(cmd.format(code_file=code_file,
-                                    listen_addr=ip,
-                                    out_file=log_file,
-                                    port=port)).strip()
+    cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
+          "--show-settings --stdout-file={out_file}"
+    cmd = cmd.format(code_file=code_file, listen_ip=ip, out_file=log_file, port=port)
+    params_js = node.run(cmd).strip()
     params = json.loads(params_js)
     params['log_file'] = log_file
+    node.info.params.update(params)
+
     port = int(params['addr'].split(":")[1])
     rpc_conn = agent.connect((ip, port))
-    node.info.params.update(params)
+
+    if plugins is not None:
+        try:
+            for name, code in plugins.items():
+                rpc_conn.server.load_module(name, None, code)
+        except Exception:
+            rpc_conn.server.stop()
+            rpc_conn.disconnect()
+            raise
+
     return RPCNode(rpc_conn, node.info)
 
 
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index ac83267..bc9ba28 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,5 +1,5 @@
 import abc
-from typing import Any, Set, Optional, List, Dict, Callable, NamedTuple
+from typing import Any, Set, Optional, Dict, NamedTuple, Optional
 from .ssh_utils import ConnCreds
 from .common_types import IPAddr
 
@@ -40,7 +40,7 @@
         pass
 
     @abc.abstractmethod
-    def put_to_file(self, path: str, content: bytes) -> None:
+    def put_to_file(self, path: Optional[str], content: bytes) -> str:
         pass
 
     def __enter__(self) -> 'ISSHHost':
@@ -69,7 +69,7 @@
         pass
 
     @abc.abstractmethod
-    def put_to_file(self, path:str, content: bytes) -> None:
+    def put_to_file(self, path: Optional[str], content: bytes) -> str:
         pass
 
     @abc.abstractmethod
diff --git a/wally/run_test.py b/wally/run_test.py
index 1a645b6..8b54f8b 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -38,8 +38,7 @@
             def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
                 try:
                     ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
-                    # TODO(koder): need to pass all required rpc bytes to this call
-                    return True, setup_rpc(ssh_node, b"")
+                    return True, setup_rpc(ssh_node, ctx.rpc_code, ctx.default_rpc_plugins)
                 except Exception as exc:
                     logger.error("During connect to {}: {!s}".format(node, exc))
                     return False, node_info
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 43ba44a..24fc178 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -46,13 +46,16 @@
                  key_file: str = None, key: bytes = None) -> None:
         self.user = user
         self.passwd = passwd
-        self.addr = IPAddr(host, port)
+        self.addr = IPAddr(host, int(port))
         self.key_file = key_file
         self.key = key
 
     def __str__(self) -> str:
         return "{}@{}:{}".format(self.user, self.addr.host, self.addr.port)
 
+    def __repr__(self) -> str:
+        return str(self)
+
 
 def parse_ssh_uri(uri: str) -> ConnCreds:
     """Parse ssh connection URL from one of following form
@@ -63,13 +66,12 @@
     if uri.startswith("ssh://"):
         uri = uri[len("ssh://"):]
 
-    res = ConnCreds("", getpass.getuser())
-
     for rr in URIsNamespace.uri_reg_exprs:
         rrm = re.match(rr, uri)
         if rrm is not None:
-            res.__dict__.update(rrm.groupdict())
-            return res
+            params = {"user": getpass.getuser()}
+            params.update(rrm.groupdict())
+            return ConnCreds(**params)
 
     raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
 
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 5715046..dfbb49f 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,5 +1,6 @@
 config: Config - full configuration
 all_nodes: List[NodeInfo] - all nodes
+cli: List[str] - cli options
 
 fuel:
     version: List[int] - FUEL master node version
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 30c46e7..a731b5a 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -29,6 +29,7 @@
         self.os_connection = None  # type: Optional[OSConnection]
         self.fuel_conn = None  # type: Optional[Connection]
         self.rpc_code = None  # type: bytes
+        self.default_rpc_plugins = None  # type: Dict[str, bytes]
 
         self.storage = storage
         self.config = config