blob: e85ded0317e2336f026ec9188513ec122c9d9ab9 [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
2import time
3import json
4import socket
5import logging
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02006import tempfile
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02007import subprocess
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02008from typing import Union, cast, Any, Optional, Tuple, Dict
koder aka kdanilov73084622016-11-16 21:51:08 +02009
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030010
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020011import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020012import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020013
koder aka kdanilov73084622016-11-16 21:51:08 +020014
15from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
16from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030017
18
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020019logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030020
koder aka kdanilov22d134e2016-11-08 11:33:19 +020021
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020022class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020023 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
24 self.conn = conn
25 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030026
koder aka kdanilov22d134e2016-11-08 11:33:19 +020027 def __str__(self) -> str:
koder aka kdanilov73084622016-11-16 21:51:08 +020028 return self.info.node_id()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020029
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020030 def put_to_file(self, path: Optional[str], content: bytes) -> str:
31 if path is None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020032 path = self.run("mktemp", nolog=True).strip()
33
34 logger.debug("PUT %s bytes to %s", len(content), path)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020035
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020036 with self.conn.open_sftp() as sftp:
37 with sftp.open(path, "wb") as fd:
38 fd.write(content)
39
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020040 return path
41
koder aka kdanilov73084622016-11-16 21:51:08 +020042 def disconnect(self):
43 self.conn.close()
44
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020045 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020046 if not nolog:
47 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
48
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020049 transport = self.conn.get_transport()
50 session = transport.open_session()
51
52 try:
53 session.set_combine_stderr(True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020054 stime = time.time()
55
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020056 session.exec_command(cmd)
57 session.settimeout(1)
58 session.shutdown_write()
59 output = ""
60
61 while True:
62 try:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020063 ndata = session.recv(1024).decode("utf-8")
64 if not ndata:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020065 break
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020066 output += ndata
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020067 except socket.timeout:
68 pass
69
70 if time.time() - stime > timeout:
71 raise OSError(output + "\nExecution timeout")
72
73 code = session.recv_exit_status()
74 finally:
75 found = False
76
77 if found:
78 session.close()
79
80 if code != 0:
81 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
82 raise OSError(templ.format(self, cmd, code, output))
83
84 return output
85
86
87class LocalHost(ISSHHost):
88 def __str__(self):
89 return "<Local>"
90
91 def get_ip(self) -> str:
92 return 'localhost'
93
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020094 def put_to_file(self, path: Optional[str], content: bytes) -> str:
95 if path is None:
96 fd, path = tempfile.mkstemp(text=False)
97 os.close(fd)
98 else:
99 dir_name = os.path.dirname(path)
100 os.makedirs(dir_name, exist_ok=True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200101
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200102 with open(path, "wb") as fd2:
103 fd2.write(content)
104
105 return path
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200106
107 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
108 proc = subprocess.Popen(cmd, shell=True,
109 stdin=subprocess.PIPE,
110 stdout=subprocess.PIPE,
111 stderr=subprocess.STDOUT)
112
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200113 stdout_data_b, _ = proc.communicate()
114 stdout_data = stdout_data_b.decode("utf8")
115
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200116 if proc.returncode != 0:
117 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
118 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
119
120 return stdout_data
121
koder aka kdanilov73084622016-11-16 21:51:08 +0200122 def disconnect(self):
123 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200124
koder aka kdanilov73084622016-11-16 21:51:08 +0200125
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200126def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
127 # setup rpc data
128 if agent.__file__.endswith(".pyc"):
129 path = agent.__file__[:-1]
130 else:
131 path = agent.__file__
132
133 master_code = open(path, "rb").read()
134
135 plugins = {} # type: Dict[str, bytes]
136 cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
137 plugins["cli"] = open(cli_path, "rb").read()
138
139 fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
140 plugins["fs"] = open(fs_path, "rb").read()
141
142 return master_code, plugins
143
144
koder aka kdanilov73084622016-11-16 21:51:08 +0200145def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
146 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200147 return LocalHost()
148 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200149 info_c = cast(NodeInfo, info)
150 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200151
152
153class RPCNode(IRPCNode):
154 """Node object"""
155
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200156 def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200157 self.info = info
158 self.conn = conn
159
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200160 def __str__(self) -> str:
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200161 return "Node({!r})".format(self.info.node_id())
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300162
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200163 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300164 return str(self)
165
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200166 def get_file_content(self, path: str, expanduser: bool = False) -> bytes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200167 logger.debug("GET %s from %s", path, self.info)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200168 if expanduser:
169 path = self.conn.fs.expanduser(path)
170 res = self.conn.fs.get_file(path)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200171 logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200172 return res
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300173
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200174 def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200175 if not nolog:
176 logger.debug("Node %s - run %s", self.info.node_id(), cmd)
177
178 cmd_b = cmd.encode("utf8")
179 proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200180 out = ""
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200181
182 while True:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200183 code, outb, _ = self.conn.cli.get_updates(proc_id)
184 out += outb.decode("utf8")
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200185 if code is not None:
186 break
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200187 time.sleep(check_timeout)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200188
189 if code != 0:
190 templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
191 raise OSError(templ.format(self.info.node_id(), cmd, code, out))
192
193 return out
koder aka kdanilov73084622016-11-16 21:51:08 +0200194
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200195 def copy_file(self, local_path: str, remote_path: str = None, expanduser: bool = False) -> str:
196 if expanduser:
197 remote_path = self.conn.fs.expanduser(remote_path)
198
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200199 data = open(local_path, 'rb').read()
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200200 return self.put_to_file(remote_path, data)
koder aka kdanilov73084622016-11-16 21:51:08 +0200201
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200202 def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
203 if expanduser:
204 path = self.conn.fs.expanduser(path)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200205 return self.conn.fs.store_file(path, content)
koder aka kdanilov73084622016-11-16 21:51:08 +0200206
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200207 def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
208 if expanduser:
209 path = self.conn.fs.expanduser(path)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200210 return self.conn.fs.file_stat(path)
koder aka kdanilov73084622016-11-16 21:51:08 +0200211
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200212 def __exit__(self, x, y, z) -> bool:
213 self.disconnect(stop=True)
214 return False
215
216 def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
217 self.conn.server.load_module(name, version, code)
218
219 def disconnect(self, stop: bool = False) -> None:
220 if stop:
221 logger.debug("Stopping RPC server on %s", self.info.node_id())
222 self.conn.server.stop()
223
224 logger.debug("Disconnecting from %s", self.info.node_id())
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200225 self.conn.disconnect()
226 self.conn = None
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300227
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300228
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200229def get_node_python_27(node: ISSHHost) -> Optional[str]:
230 python_cmd = None # type: Optional[str]
231 try:
232 python_cmd = node.run('which python2.7').strip()
233 except Exception as exc:
234 pass
235
236 if python_cmd is None:
237 try:
238 if '2.7' in node.run('python --version'):
239 python_cmd = node.run('which python').strip()
240 except Exception as exc:
241 pass
242
243 return python_cmd
244
245
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200246def setup_rpc(node: ISSHHost,
247 rpc_server_code: bytes,
248 plugins: Dict[str, bytes] = None,
249 port: int = 0,
250 log_level: str = None) -> IRPCNode:
251
252 logger.debug("Setting up RPC connection to {}".format(node.info))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200253 python_cmd = get_node_python_27(node)
254 if python_cmd:
255 logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
256 else:
257 logger.error(("Can't find python2.7 on node {}. " +
258 "Install python2.7 and rerun test").format(node.info))
259 raise ValueError("Python not found")
260
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200261 code_file = node.put_to_file(None, rpc_server_code)
koder aka kdanilov73084622016-11-16 21:51:08 +0200262 ip = node.info.ssh_creds.addr.host
263
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200264 log_file = None # type: Optional[str]
265 if log_level:
266 log_file = node.run("mktemp", nolog=True).strip()
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200267 cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
268 cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200269 logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
270 node.info.node_id(), log_file, log_level))
271 else:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200272 cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
273 cmd = cmd.format(python_cmd, code_file, ip, port)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200274
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200275 params_js = node.run(cmd).strip()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200276 params = json.loads(params_js)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200277
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200278 node.info.params.update(params)
279
koder aka kdanilov73084622016-11-16 21:51:08 +0200280 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200281 rpc_conn = agent.connect((ip, port))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200282
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200283 rpc_node = RPCNode(rpc_conn, node.info)
284 rpc_node.rpc_log_file = log_file
285
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200286 if plugins is not None:
287 try:
288 for name, code in plugins.items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200289 rpc_node.upload_plugin(name, code)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200290 except Exception:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200291 rpc_node.disconnect(True)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200292 raise
293
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200294 return rpc_node
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200295
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200296
297 # class RemoteNode(node_interfaces.IRPCNode):
298# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
299# self.info = node_info
300# self.rpc = rpc_conn
301#
302 # def get_interface(self, ip: str) -> str:
303 # """Get node external interface for given IP"""
304 # data = self.run("ip a", nolog=True)
305 # curr_iface = None
306 #
307 # for line in data.split("\n"):
308 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
309 # if match1 is not None:
310 # curr_iface = match1.group('name')
311 #
312 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
313 # if match2 is not None:
314 # if match2.group('ip') == ip:
315 # assert curr_iface is not None
316 # return curr_iface
317 #
318 # raise KeyError("Can't found interface for ip {0}".format(ip))
319 #
320 # def get_user(self) -> str:
321 # """"get ssh connection username"""
322 # if self.ssh_conn_url == 'local':
323 # return getpass.getuser()
324 # return self.ssh_cred.user
325 #
326 #
327 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
328 # """Run command on node. Will use rpc connection, if available"""
329 #
330 # if self.rpc_conn is None:
331 # return run_over_ssh(self.ssh_conn, cmd,
332 # stdin_data=stdin_data, timeout=timeout,
333 # nolog=nolog, node=self)
334 # assert not stdin_data
335 # proc_id = self.rpc_conn.cli.spawn(cmd)
336 # exit_code = None
337 # output = ""
338 #
339 # while exit_code is None:
340 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
341 # output += stdout_data + stderr_data
342 #
343 # return exit_code, output
344
345