blob: 2a57c65013af4e6d245e141bbc6401a321688e2f [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
2import time
3import json
4import socket
5import logging
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02006import tempfile
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02007import subprocess
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02008from typing import Union, cast, Any, Optional, Tuple, Dict
koder aka kdanilov73084622016-11-16 21:51:08 +02009
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030010
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020011import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020012import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020013
koder aka kdanilov73084622016-11-16 21:51:08 +020014
15from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
16from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030017
18
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020019logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030020
koder aka kdanilov22d134e2016-11-08 11:33:19 +020021
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020022class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020023 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
24 self.conn = conn
25 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030026
koder aka kdanilov22d134e2016-11-08 11:33:19 +020027 def __str__(self) -> str:
koder aka kdanilov73084622016-11-16 21:51:08 +020028 return self.info.node_id()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020029
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020030 def put_to_file(self, path: Optional[str], content: bytes) -> str:
31 if path is None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020032 path = self.run("mktemp", nolog=True).strip()
33
34 logger.debug("PUT %s bytes to %s", len(content), path)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020035
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020036 with self.conn.open_sftp() as sftp:
37 with sftp.open(path, "wb") as fd:
38 fd.write(content)
39
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020040 return path
41
koder aka kdanilov73084622016-11-16 21:51:08 +020042 def disconnect(self):
43 self.conn.close()
44
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020045 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020046 if not nolog:
47 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
48
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020049 transport = self.conn.get_transport()
50 session = transport.open_session()
51
52 try:
53 session.set_combine_stderr(True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020054 stime = time.time()
55
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020056 session.exec_command(cmd)
57 session.settimeout(1)
58 session.shutdown_write()
59 output = ""
60
61 while True:
62 try:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020063 ndata = session.recv(1024).decode("utf-8")
64 if not ndata:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020065 break
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020066 output += ndata
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020067 except socket.timeout:
68 pass
69
70 if time.time() - stime > timeout:
71 raise OSError(output + "\nExecution timeout")
72
73 code = session.recv_exit_status()
74 finally:
75 found = False
76
77 if found:
78 session.close()
79
80 if code != 0:
81 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
82 raise OSError(templ.format(self, cmd, code, output))
83
84 return output
85
86
87class LocalHost(ISSHHost):
88 def __str__(self):
89 return "<Local>"
90
91 def get_ip(self) -> str:
92 return 'localhost'
93
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020094 def put_to_file(self, path: Optional[str], content: bytes) -> str:
95 if path is None:
96 fd, path = tempfile.mkstemp(text=False)
97 os.close(fd)
98 else:
99 dir_name = os.path.dirname(path)
100 os.makedirs(dir_name, exist_ok=True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200101
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200102 with open(path, "wb") as fd2:
103 fd2.write(content)
104
105 return path
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200106
107 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
108 proc = subprocess.Popen(cmd, shell=True,
109 stdin=subprocess.PIPE,
110 stdout=subprocess.PIPE,
111 stderr=subprocess.STDOUT)
112
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200113 stdout_data_b, _ = proc.communicate()
114 stdout_data = stdout_data_b.decode("utf8")
115
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200116 if proc.returncode != 0:
117 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
118 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
119
120 return stdout_data
121
koder aka kdanilov73084622016-11-16 21:51:08 +0200122 def disconnect(self):
123 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200124
koder aka kdanilov73084622016-11-16 21:51:08 +0200125
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200126def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
127 # setup rpc data
128 if agent.__file__.endswith(".pyc"):
129 path = agent.__file__[:-1]
130 else:
131 path = agent.__file__
132
133 master_code = open(path, "rb").read()
134
135 plugins = {} # type: Dict[str, bytes]
136 cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
137 plugins["cli"] = open(cli_path, "rb").read()
138
139 fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
140 plugins["fs"] = open(fs_path, "rb").read()
141
142 return master_code, plugins
143
144
koder aka kdanilov73084622016-11-16 21:51:08 +0200145def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
146 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200147 return LocalHost()
148 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200149 info_c = cast(NodeInfo, info)
150 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200151
152
153class RPCNode(IRPCNode):
154 """Node object"""
155
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200156 def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200157 self.info = info
158 self.conn = conn
159
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200160 def __str__(self) -> str:
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200161 return "Node({!r})".format(self.info.node_id())
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300162
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200163 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300164 return str(self)
165
koder aka kdanilov73084622016-11-16 21:51:08 +0200166 def get_file_content(self, path: str) -> bytes:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200167 logger.debug("GET %s", path)
168 res = self.conn.fs.get_file(path, expanduser=True)
169 logger.debug("Receive %s bytes from %s", len(res), path)
170 return res
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300171
koder aka kdanilov73084622016-11-16 21:51:08 +0200172 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200173 if not nolog:
174 logger.debug("Node %s - run %s", self.info.node_id(), cmd)
175
176 cmd_b = cmd.encode("utf8")
177 proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200178 out = ""
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200179
180 while True:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200181 code, outb, _ = self.conn.cli.get_updates(proc_id)
182 out += outb.decode("utf8")
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200183 if code is not None:
184 break
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200185 time.sleep(0.01)
186
187 if code != 0:
188 templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
189 raise OSError(templ.format(self.info.node_id(), cmd, code, out))
190
191 return out
koder aka kdanilov73084622016-11-16 21:51:08 +0200192
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200193 def copy_file(self, local_path: str, remote_path: str = None, expand_user: bool = False) -> str:
194 data = open(local_path, 'rb').read()
195 return self.put_to_file(remote_path, data, expand_user)
koder aka kdanilov73084622016-11-16 21:51:08 +0200196
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200197 def put_to_file(self, path: Optional[str], content: bytes, expand_user: bool = False) -> str:
198 return self.conn.fs.store_file(path, content, expand_user)
koder aka kdanilov73084622016-11-16 21:51:08 +0200199
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200200 def stat_file(self, path: str, expand_user: bool = False) -> Dict[str, int]:
201 return self.conn.fs.file_stat(path, expand_user)
koder aka kdanilov73084622016-11-16 21:51:08 +0200202
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200203 def __exit__(self, x, y, z) -> bool:
204 self.disconnect(stop=True)
205 return False
206
207 def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
208 self.conn.server.load_module(name, version, code)
209
210 def disconnect(self, stop: bool = False) -> None:
211 if stop:
212 logger.debug("Stopping RPC server on %s", self.info.node_id())
213 self.conn.server.stop()
214
215 logger.debug("Disconnecting from %s", self.info.node_id())
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200216 self.conn.disconnect()
217 self.conn = None
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300218
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300219
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200220def setup_rpc(node: ISSHHost,
221 rpc_server_code: bytes,
222 plugins: Dict[str, bytes] = None,
223 port: int = 0,
224 log_level: str = None) -> IRPCNode:
225
226 logger.debug("Setting up RPC connection to {}".format(node.info))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200227 code_file = node.put_to_file(None, rpc_server_code)
koder aka kdanilov73084622016-11-16 21:51:08 +0200228 ip = node.info.ssh_creds.addr.host
229
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200230 log_file = None # type: Optional[str]
231 if log_level:
232 log_file = node.run("mktemp", nolog=True).strip()
233 cmd = "python {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
234 cmd = cmd.format(code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
235 logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
236 node.info.node_id(), log_file, log_level))
237 else:
238 cmd = "python {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
239 cmd = cmd.format(code_file, ip, port)
240
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200241 params_js = node.run(cmd).strip()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200242 params = json.loads(params_js)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200243
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200244 node.info.params.update(params)
245
koder aka kdanilov73084622016-11-16 21:51:08 +0200246 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200247 rpc_conn = agent.connect((ip, port))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200248
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200249 rpc_node = RPCNode(rpc_conn, node.info)
250 rpc_node.rpc_log_file = log_file
251
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200252 if plugins is not None:
253 try:
254 for name, code in plugins.items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200255 rpc_node.upload_plugin(name, code)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200256 except Exception:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200257 rpc_node.disconnect(True)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200258 raise
259
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200260 return rpc_node
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200261
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200262
263 # class RemoteNode(node_interfaces.IRPCNode):
264# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
265# self.info = node_info
266# self.rpc = rpc_conn
267#
268 # def get_interface(self, ip: str) -> str:
269 # """Get node external interface for given IP"""
270 # data = self.run("ip a", nolog=True)
271 # curr_iface = None
272 #
273 # for line in data.split("\n"):
274 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
275 # if match1 is not None:
276 # curr_iface = match1.group('name')
277 #
278 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
279 # if match2 is not None:
280 # if match2.group('ip') == ip:
281 # assert curr_iface is not None
282 # return curr_iface
283 #
284 # raise KeyError("Can't found interface for ip {0}".format(ip))
285 #
286 # def get_user(self) -> str:
287 # """"get ssh connection username"""
288 # if self.ssh_conn_url == 'local':
289 # return getpass.getuser()
290 # return self.ssh_cred.user
291 #
292 #
293 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
294 # """Run command on node. Will use rpc connection, if available"""
295 #
296 # if self.rpc_conn is None:
297 # return run_over_ssh(self.ssh_conn, cmd,
298 # stdin_data=stdin_data, timeout=timeout,
299 # nolog=nolog, node=self)
300 # assert not stdin_data
301 # proc_id = self.rpc_conn.cli.spawn(cmd)
302 # exit_code = None
303 # output = ""
304 #
305 # while exit_code is None:
306 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
307 # output += stdout_data + stderr_data
308 #
309 # return exit_code, output
310
311