fixeg code
diff --git a/wally/ceph.py b/wally/ceph.py
index e23343e..1e79126 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -85,7 +85,7 @@
if key is None:
key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
- with setup_rpc(connect(info), ctx.rpc_code) as node:
+ with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins) as node:
# new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
ssh_key = node.get_file_content("~/.ssh/id_rsa")
diff --git a/wally/fuel.py b/wally/fuel.py
index 040dcf4..1680d29 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -77,7 +77,7 @@
if discover_nodes:
try:
- fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
+ fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code, ctx.default_rpc_plugins)
except AuthenticationException:
raise StopTestError("Wrong fuel credentials")
except Exception:
diff --git a/wally/main.py b/wally/main.py
index 14da140..9a453fb 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -1,5 +1,4 @@
import os
-import sys
import time
import signal
import logging
@@ -28,8 +27,9 @@
except ImportError:
faulthandler = None
+import agent
-from . import utils, run_test, pretty_yaml
+from . import utils, node
from .storage import make_storage, Storage
from .config import Config
from .logger import setup_loggers
@@ -125,13 +125,13 @@
test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
help="Don't connect/discover fuel nodes")
test_parser.add_argument('--no-report', action='store_true', help="Skip report stages")
- test_parser.add_argument('-r', '--resume', default=None, help="Resume previously stopped test, stored in DIR",
- metavar="DIR")
test_parser.add_argument('--result-dir', default=None, help="Save results to DIR", metavart="DIR")
test_parser.add_argument("comment", help="Test information")
test_parser.add_argument("config_file", help="Yaml config file", nargs='?', default=None)
# ---------------------------------------------------------------------
+ test_parser = subparsers.add_parser('resume', help='resume tests')
+ test_parser.add_argument("storage_dir", help="Path to test directory")
return parser.parse_args(argv[1:])
@@ -152,7 +152,6 @@
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
-
stages = [] # type: List[Stage]
# stop mypy from telling that config & storage might be undeclared
@@ -160,28 +159,23 @@
storage = None # type: Storage
if opts.subparser_name == 'test':
- if opts.resume:
- storage = make_storage(opts.resume, existing=True)
- config = storage.load(Config, 'config')
- else:
- file_name = os.path.abspath(opts.config_file)
- with open(file_name) as fd:
- config = Config(yaml_load(fd.read())) # type: ignore
+ file_name = os.path.abspath(opts.config_file)
+ with open(file_name) as fd:
+ config = Config(yaml_load(fd.read())) # type: ignore
- config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
- config.comment = opts.comment
- config.keep_vm = opts.keep_vm
- config.no_tests = opts.no_tests
- config.dont_discover_nodes = opts.dont_discover_nodes
- config.build_id = opts.build_id
- config.build_description = opts.build_description
- config.build_type = opts.build_type
- config.settings_dir = get_config_path(config, opts.settings_dir)
+ config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
+ config.comment = opts.comment
+ config.keep_vm = opts.keep_vm
+ config.no_tests = opts.no_tests
+ config.dont_discover_nodes = opts.dont_discover_nodes
+ config.build_id = opts.build_id
+ config.build_description = opts.build_description
+ config.build_type = opts.build_type
+ config.settings_dir = get_config_path(config, opts.settings_dir)
- storage = make_storage(config.storage_url)
+ storage = make_storage(config.storage_url)
- storage['config'] = config # type: ignore
-
+ storage['config'] = config # type: ignore
stages.append(DiscoverCephStage) # type: ignore
stages.append(DiscoverOSStage) # type: ignore
@@ -195,6 +189,14 @@
if not opts.dont_collect:
stages.append(CollectInfoStage) # type: ignore
+ storage['cli'] = argv
+
+ elif opts.subparser_name == 'resume':
+ storage = make_storage(opts.storage_dir, existing=True)
+ config = storage.load(Config, 'config')
+ # TODO: fix this
+ raise NotImplementedError("Resume in not fully implemented")
+
elif opts.subparser_name == 'ls':
tab = texttable.Texttable(max_width=200)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
@@ -230,6 +232,7 @@
logger.info("All info would be stored into %r", config.storage_url)
ctx = TestRun(config, storage)
+ ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
stages.sort(key=lambda x: x.priority)
@@ -237,8 +240,12 @@
failed = False
cleanup_stages = []
for stage in stages:
+ if stage.config_block is not None:
+ if stage.config_block not in ctx.config:
+ continue
+
+ cleanup_stages.append(stage)
try:
- cleanup_stages.append(stage)
with log_stage(stage):
stage.run(ctx)
except:
diff --git a/wally/node.py b/wally/node.py
index fae7879..54a6291 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -4,7 +4,7 @@
import socket
import logging
import subprocess
-from typing import Union, cast, Any
+from typing import Union, cast, Any, Optional, Tuple, Dict, List
import agent
@@ -26,11 +26,16 @@
def __str__(self) -> str:
return self.info.node_id()
- def put_to_file(self, path: str, content: bytes) -> None:
+ def put_to_file(self, path: Optional[str], content: bytes) -> str:
+ if path is None:
+ path = self.run("mktemp").strip()
+
with self.conn.open_sftp() as sftp:
with sftp.open(path, "wb") as fd:
fd.write(content)
+ return path
+
def disconnect(self):
self.conn.close()
@@ -53,10 +58,10 @@
while True:
try:
- ndata = session.recv(1024)
- output += ndata
- if "" == ndata:
+ ndata = session.recv(1024).decode("utf-8")
+ if not ndata:
break
+ output += ndata
except socket.timeout:
pass
@@ -97,7 +102,9 @@
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
- stdout_data, _ = proc.communicate()
+ stdout_data_b, _ = proc.communicate()
+ stdout_data = stdout_data_b.decode("utf8")
+
if proc.returncode != 0:
templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
@@ -108,6 +115,25 @@
pass
+def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
+ # setup rpc data
+ if agent.__file__.endswith(".pyc"):
+ path = agent.__file__[:-1]
+ else:
+ path = agent.__file__
+
+ master_code = open(path, "rb").read()
+
+ plugins = {} # type: Dict[str, bytes]
+ cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
+ plugins["cli"] = open(cli_path, "rb").read()
+
+ fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
+ plugins["fs"] = open(fs_path, "rb").read()
+
+ return master_code, plugins
+
+
def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
if info == 'local':
return LocalHost()
@@ -119,12 +145,12 @@
class RPCNode(IRPCNode):
"""Node object"""
- def __init__(self, conn: agent.Client, info: NodeInfo) -> None:
+ def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
self.info = info
self.conn = conn
def __str__(self) -> str:
- return "<Node: url={!s} roles={!r} hops=/>".format(self.info.ssh_creds, ",".join(self.info.roles))
+ return "Node(url={!r}, roles={!r})".format(self.info.ssh_creds, ",".join(self.info.roles))
def __repr__(self) -> str:
return str(self)
@@ -138,7 +164,7 @@
def copy_file(self, local_path: str, remote_path: str = None) -> str:
raise NotImplementedError()
- def put_to_file(self, path: str, content: bytes) -> None:
+ def put_to_file(self, path: Optional[str], content: bytes) -> str:
raise NotImplementedError()
def get_interface(self, ip: str) -> str:
@@ -148,27 +174,35 @@
raise NotImplementedError()
def disconnect(self) -> str:
- raise NotImplementedError()
+ self.conn.disconnect()
+ self.conn = None
-def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
- code_file = node.run("mktemp").strip()
+def setup_rpc(node: ISSHHost, rpc_server_code: bytes, plugins: Dict[str, bytes] = None, port: int = 0) -> IRPCNode:
log_file = node.run("mktemp").strip()
- node.put_to_file(code_file, rpc_server_code)
- cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
- "--show-settings --stdout-file={out_file}"
-
+ code_file = node.put_to_file(None, rpc_server_code)
ip = node.info.ssh_creds.addr.host
- params_js = node.run(cmd.format(code_file=code_file,
- listen_addr=ip,
- out_file=log_file,
- port=port)).strip()
+ cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
+ "--show-settings --stdout-file={out_file}"
+ cmd = cmd.format(code_file=code_file, listen_ip=ip, out_file=log_file, port=port)
+ params_js = node.run(cmd).strip()
params = json.loads(params_js)
params['log_file'] = log_file
+ node.info.params.update(params)
+
port = int(params['addr'].split(":")[1])
rpc_conn = agent.connect((ip, port))
- node.info.params.update(params)
+
+ if plugins is not None:
+ try:
+ for name, code in plugins.items():
+ rpc_conn.server.load_module(name, None, code)
+ except Exception:
+ rpc_conn.server.stop()
+ rpc_conn.disconnect()
+ raise
+
return RPCNode(rpc_conn, node.info)
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index ac83267..bc9ba28 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,5 +1,5 @@
import abc
-from typing import Any, Set, Optional, List, Dict, Callable, NamedTuple
+from typing import Any, Set, Optional, Dict, NamedTuple, Optional
from .ssh_utils import ConnCreds
from .common_types import IPAddr
@@ -40,7 +40,7 @@
pass
@abc.abstractmethod
- def put_to_file(self, path: str, content: bytes) -> None:
+ def put_to_file(self, path: Optional[str], content: bytes) -> str:
pass
def __enter__(self) -> 'ISSHHost':
@@ -69,7 +69,7 @@
pass
@abc.abstractmethod
- def put_to_file(self, path:str, content: bytes) -> None:
+ def put_to_file(self, path: Optional[str], content: bytes) -> str:
pass
@abc.abstractmethod
diff --git a/wally/run_test.py b/wally/run_test.py
index 1a645b6..8b54f8b 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -38,8 +38,7 @@
def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
try:
ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
- # TODO(koder): need to pass all required rpc bytes to this call
- return True, setup_rpc(ssh_node, b"")
+ return True, setup_rpc(ssh_node, ctx.rpc_code, ctx.default_rpc_plugins)
except Exception as exc:
logger.error("During connect to {}: {!s}".format(node, exc))
return False, node_info
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 43ba44a..24fc178 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -46,13 +46,16 @@
key_file: str = None, key: bytes = None) -> None:
self.user = user
self.passwd = passwd
- self.addr = IPAddr(host, port)
+ self.addr = IPAddr(host, int(port))
self.key_file = key_file
self.key = key
def __str__(self) -> str:
return "{}@{}:{}".format(self.user, self.addr.host, self.addr.port)
+ def __repr__(self) -> str:
+ return str(self)
+
def parse_ssh_uri(uri: str) -> ConnCreds:
"""Parse ssh connection URL from one of following form
@@ -63,13 +66,12 @@
if uri.startswith("ssh://"):
uri = uri[len("ssh://"):]
- res = ConnCreds("", getpass.getuser())
-
for rr in URIsNamespace.uri_reg_exprs:
rrm = re.match(rr, uri)
if rrm is not None:
- res.__dict__.update(rrm.groupdict())
- return res
+ params = {"user": getpass.getuser()}
+ params.update(rrm.groupdict())
+ return ConnCreds(**params)
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 5715046..dfbb49f 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,5 +1,6 @@
config: Config - full configuration
all_nodes: List[NodeInfo] - all nodes
+cli: List[str] - cli options
fuel:
version: List[int] - FUEL master node version
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 30c46e7..a731b5a 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -29,6 +29,7 @@
self.os_connection = None # type: Optional[OSConnection]
self.fuel_conn = None # type: Optional[Connection]
self.rpc_code = None # type: bytes
+ self.default_rpc_plugins = None # type: Dict[str, bytes]
self.storage = storage
self.config = config