Skeleton and sensors works
diff --git a/wally/node.py b/wally/node.py
index 54a6291..4d2dda8 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -3,8 +3,9 @@
import json
import socket
import logging
+import tempfile
import subprocess
-from typing import Union, cast, Any, Optional, Tuple, Dict, List
+from typing import Union, cast, Any, Optional, Tuple, Dict
import agent
@@ -28,7 +29,9 @@
def put_to_file(self, path: Optional[str], content: bytes) -> str:
if path is None:
- path = self.run("mktemp").strip()
+ path = self.run("mktemp", nolog=True).strip()
+
+ logger.debug("PUT %s bytes to %s", len(content), path)
with self.conn.open_sftp() as sftp:
with sftp.open(path, "wb") as fd:
@@ -40,17 +43,16 @@
self.conn.close()
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+ if not nolog:
+ logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
+
transport = self.conn.get_transport()
session = transport.open_session()
try:
session.set_combine_stderr(True)
-
stime = time.time()
- if not nolog:
- logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
-
session.exec_command(cmd)
session.settimeout(1)
session.shutdown_write()
@@ -89,12 +91,18 @@
def get_ip(self) -> str:
return 'localhost'
- def put_to_file(self, path: str, content: bytes) -> None:
- dir_name = os.path.dirname(path)
- os.makedirs(dir_name, exist_ok=True)
+ def put_to_file(self, path: Optional[str], content: bytes) -> str:
+ if path is None:
+ fd, path = tempfile.mkstemp(text=False)
+ os.close(fd)
+ else:
+ dir_name = os.path.dirname(path)
+ os.makedirs(dir_name, exist_ok=True)
- with open(path, "wb") as fd:
- fd.write(content)
+ with open(path, "wb") as fd2:
+ fd2.write(content)
+
+ return path
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
proc = subprocess.Popen(cmd, shell=True,
@@ -156,10 +164,29 @@
return str(self)
def get_file_content(self, path: str) -> bytes:
- raise NotImplementedError()
+ logger.debug("GET %s", path)
+ res = self.conn.fs.get_file(path, expanduser=True)
+ logger.debug("Receive %s bytes from %s", len(res), path)
+ return res
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
- raise NotImplementedError()
+ if not nolog:
+ logger.debug("Node %s - run %s", self.info.node_id(), cmd)
+
+ cmd_b = cmd.encode("utf8")
+ proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
+ code = None
+ out = ""
+ while code is None:
+ code, outb, _ = self.conn.cli.get_updates(proc_id)
+ out += outb.decode("utf8")
+ time.sleep(0.01)
+
+ if code != 0:
+ templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
+ raise OSError(templ.format(self.info.node_id(), cmd, code, out))
+
+ return out
def copy_file(self, local_path: str, remote_path: str = None) -> str:
raise NotImplementedError()
@@ -173,38 +200,64 @@
def stat_file(self, path: str) -> Any:
raise NotImplementedError()
- def disconnect(self) -> str:
+ def __exit__(self, x, y, z) -> bool:
+ self.disconnect(stop=True)
+ return False
+
+ def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
+ self.conn.server.load_module(name, version, code)
+
+ def disconnect(self, stop: bool = False) -> None:
+ if stop:
+ logger.debug("Stopping RPC server on %s", self.info.node_id())
+ self.conn.server.stop()
+
+ logger.debug("Disconnecting from %s", self.info.node_id())
self.conn.disconnect()
self.conn = None
-def setup_rpc(node: ISSHHost, rpc_server_code: bytes, plugins: Dict[str, bytes] = None, port: int = 0) -> IRPCNode:
- log_file = node.run("mktemp").strip()
+def setup_rpc(node: ISSHHost,
+ rpc_server_code: bytes,
+ plugins: Dict[str, bytes] = None,
+ port: int = 0,
+ log_level: str = None) -> IRPCNode:
+
+ logger.debug("Setting up RPC connection to {}".format(node.info))
code_file = node.put_to_file(None, rpc_server_code)
ip = node.info.ssh_creds.addr.host
- cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
- "--show-settings --stdout-file={out_file}"
- cmd = cmd.format(code_file=code_file, listen_ip=ip, out_file=log_file, port=port)
+ log_file = None # type: Optional[str]
+ if log_level:
+ log_file = node.run("mktemp", nolog=True).strip()
+ cmd = "python {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
+ cmd = cmd.format(code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
+ logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
+ node.info.node_id(), log_file, log_level))
+ else:
+ cmd = "python {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
+ cmd = cmd.format(code_file, ip, port)
+
params_js = node.run(cmd).strip()
params = json.loads(params_js)
- params['log_file'] = log_file
+
node.info.params.update(params)
port = int(params['addr'].split(":")[1])
rpc_conn = agent.connect((ip, port))
+ rpc_node = RPCNode(rpc_conn, node.info)
+ rpc_node.rpc_log_file = log_file
+
if plugins is not None:
try:
for name, code in plugins.items():
- rpc_conn.server.load_module(name, None, code)
+ rpc_node.upload_plugin(name, code)
except Exception:
- rpc_conn.server.stop()
- rpc_conn.disconnect()
+ rpc_node.disconnect(True)
raise
- return RPCNode(rpc_conn, node.info)
-
+ return rpc_node
# class RemoteNode(node_interfaces.IRPCNode):