blob: 38342c6f36d3cceee6d52c54918c9c03fd203839 [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
koder aka kdanilov108ac362017-01-19 20:17:16 +02002import zlib
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02003import time
4import json
5import socket
6import logging
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02007import tempfile
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02008import subprocess
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02009from typing import Union, cast, Any, Optional, Tuple, Dict
koder aka kdanilov73084622016-11-16 21:51:08 +020010
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030011
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020013import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020014
koder aka kdanilov73084622016-11-16 21:51:08 +020015
16from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
17from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030018
19
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020020logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030021
koder aka kdanilov22d134e2016-11-08 11:33:19 +020022
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020023class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020024 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
25 self.conn = conn
26 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030027
koder aka kdanilov22d134e2016-11-08 11:33:19 +020028 def __str__(self) -> str:
koder aka kdanilov108ac362017-01-19 20:17:16 +020029 return self.node_id
30
31 @property
32 def node_id(self) -> str:
33 return self.info.node_id
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020034
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020035 def put_to_file(self, path: Optional[str], content: bytes) -> str:
36 if path is None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020037 path = self.run("mktemp", nolog=True).strip()
38
39 logger.debug("PUT %s bytes to %s", len(content), path)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020040
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020041 with self.conn.open_sftp() as sftp:
42 with sftp.open(path, "wb") as fd:
43 fd.write(content)
44
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020045 return path
46
koder aka kdanilov73084622016-11-16 21:51:08 +020047 def disconnect(self):
48 self.conn.close()
49
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020050 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020051 if not nolog:
52 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
53
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020054 transport = self.conn.get_transport()
55 session = transport.open_session()
56
57 try:
58 session.set_combine_stderr(True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020059 stime = time.time()
60
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020061 session.exec_command(cmd)
62 session.settimeout(1)
63 session.shutdown_write()
64 output = ""
65
66 while True:
67 try:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020068 ndata = session.recv(1024).decode("utf-8")
69 if not ndata:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020070 break
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020071 output += ndata
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020072 except socket.timeout:
73 pass
74
75 if time.time() - stime > timeout:
76 raise OSError(output + "\nExecution timeout")
77
78 code = session.recv_exit_status()
79 finally:
80 found = False
81
82 if found:
83 session.close()
84
85 if code != 0:
86 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
87 raise OSError(templ.format(self, cmd, code, output))
88
89 return output
90
91
92class LocalHost(ISSHHost):
93 def __str__(self):
94 return "<Local>"
95
96 def get_ip(self) -> str:
97 return 'localhost'
98
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020099 def put_to_file(self, path: Optional[str], content: bytes) -> str:
100 if path is None:
101 fd, path = tempfile.mkstemp(text=False)
102 os.close(fd)
103 else:
104 dir_name = os.path.dirname(path)
105 os.makedirs(dir_name, exist_ok=True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200106
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200107 with open(path, "wb") as fd2:
108 fd2.write(content)
109
110 return path
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200111
112 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
113 proc = subprocess.Popen(cmd, shell=True,
114 stdin=subprocess.PIPE,
115 stdout=subprocess.PIPE,
116 stderr=subprocess.STDOUT)
117
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200118 stdout_data_b, _ = proc.communicate()
119 stdout_data = stdout_data_b.decode("utf8")
120
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200121 if proc.returncode != 0:
122 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
123 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
124
125 return stdout_data
126
koder aka kdanilov73084622016-11-16 21:51:08 +0200127 def disconnect(self):
128 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200129
koder aka kdanilov73084622016-11-16 21:51:08 +0200130
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200131def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
132 # setup rpc data
133 if agent.__file__.endswith(".pyc"):
134 path = agent.__file__[:-1]
135 else:
136 path = agent.__file__
137
138 master_code = open(path, "rb").read()
139
140 plugins = {} # type: Dict[str, bytes]
141 cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
142 plugins["cli"] = open(cli_path, "rb").read()
143
144 fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
145 plugins["fs"] = open(fs_path, "rb").read()
146
147 return master_code, plugins
148
149
koder aka kdanilov73084622016-11-16 21:51:08 +0200150def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
151 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200152 return LocalHost()
153 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200154 info_c = cast(NodeInfo, info)
155 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200156
157
158class RPCNode(IRPCNode):
159 """Node object"""
160
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200161 def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200162 self.info = info
163 self.conn = conn
164
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200165 def __str__(self) -> str:
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200166 return "Node({!r})".format(self.info.node_id())
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300167
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200168 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300169 return str(self)
170
koder aka kdanilov108ac362017-01-19 20:17:16 +0200171 def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200172 logger.debug("GET %s from %s", path, self.info)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200173 if expanduser:
174 path = self.conn.fs.expanduser(path)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200175 res = self.conn.fs.get_file(path, compress)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200176 logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200177 if compress:
178 res = zlib.decompress(res)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200179 return res
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300180
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200181 def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200182 if not nolog:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200183 logger.debug("Node %s - run %s", self.node_id, cmd)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200184
185 cmd_b = cmd.encode("utf8")
186 proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200187 out = ""
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200188
189 while True:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200190 code, outb, _ = self.conn.cli.get_updates(proc_id)
191 out += outb.decode("utf8")
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200192 if code is not None:
193 break
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200194 time.sleep(check_timeout)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200195
196 if code != 0:
197 templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
koder aka kdanilov108ac362017-01-19 20:17:16 +0200198 raise OSError(templ.format(self.node_id, cmd, code, out))
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200199
200 return out
koder aka kdanilov73084622016-11-16 21:51:08 +0200201
koder aka kdanilov108ac362017-01-19 20:17:16 +0200202 def copy_file(self, local_path: str, remote_path: str = None,
203 expanduser: bool = False,
204 compress: bool = False) -> str:
205
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200206 if expanduser:
207 remote_path = self.conn.fs.expanduser(remote_path)
208
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200209 data = open(local_path, 'rb').read() # type: bytes
koder aka kdanilov108ac362017-01-19 20:17:16 +0200210 return self.put_to_file(remote_path, data, compress=compress)
koder aka kdanilov73084622016-11-16 21:51:08 +0200211
koder aka kdanilov108ac362017-01-19 20:17:16 +0200212 def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200213 if expanduser:
214 path = self.conn.fs.expanduser(path)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200215 if compress:
216 content = zlib.compress(content)
217 return self.conn.fs.store_file(path, content, compress)
koder aka kdanilov73084622016-11-16 21:51:08 +0200218
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200219 def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
220 if expanduser:
221 path = self.conn.fs.expanduser(path)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200222 return self.conn.fs.file_stat(path)
koder aka kdanilov73084622016-11-16 21:51:08 +0200223
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200224 def __exit__(self, x, y, z) -> bool:
225 self.disconnect(stop=True)
226 return False
227
228 def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
229 self.conn.server.load_module(name, version, code)
230
231 def disconnect(self, stop: bool = False) -> None:
232 if stop:
233 logger.debug("Stopping RPC server on %s", self.info.node_id())
234 self.conn.server.stop()
235
236 logger.debug("Disconnecting from %s", self.info.node_id())
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200237 self.conn.disconnect()
238 self.conn = None
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300239
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300240
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200241def get_node_python_27(node: ISSHHost) -> Optional[str]:
242 python_cmd = None # type: Optional[str]
243 try:
244 python_cmd = node.run('which python2.7').strip()
245 except Exception as exc:
246 pass
247
248 if python_cmd is None:
249 try:
250 if '2.7' in node.run('python --version'):
251 python_cmd = node.run('which python').strip()
252 except Exception as exc:
253 pass
254
255 return python_cmd
256
257
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200258def setup_rpc(node: ISSHHost,
259 rpc_server_code: bytes,
260 plugins: Dict[str, bytes] = None,
261 port: int = 0,
262 log_level: str = None) -> IRPCNode:
263
264 logger.debug("Setting up RPC connection to {}".format(node.info))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200265 python_cmd = get_node_python_27(node)
266 if python_cmd:
267 logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
268 else:
269 logger.error(("Can't find python2.7 on node {}. " +
270 "Install python2.7 and rerun test").format(node.info))
271 raise ValueError("Python not found")
272
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200273 code_file = node.put_to_file(None, rpc_server_code)
koder aka kdanilov73084622016-11-16 21:51:08 +0200274 ip = node.info.ssh_creds.addr.host
275
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200276 log_file = None # type: Optional[str]
277 if log_level:
278 log_file = node.run("mktemp", nolog=True).strip()
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200279 cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
280 cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200281 logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
koder aka kdanilov108ac362017-01-19 20:17:16 +0200282 node.node_id, log_file, log_level))
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200283 else:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200284 cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
285 cmd = cmd.format(python_cmd, code_file, ip, port)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200286
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200287 params_js = node.run(cmd).strip()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200288 params = json.loads(params_js)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200289
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200290 node.info.params.update(params)
291
koder aka kdanilov73084622016-11-16 21:51:08 +0200292 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200293 rpc_conn = agent.connect((ip, port))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200294
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200295 rpc_node = RPCNode(rpc_conn, node.info)
296 rpc_node.rpc_log_file = log_file
297
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200298 if plugins is not None:
299 try:
300 for name, code in plugins.items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200301 rpc_node.upload_plugin(name, code)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200302 except Exception:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200303 rpc_node.disconnect(True)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200304 raise
305
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200306 return rpc_node
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200307
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200308
309 # class RemoteNode(node_interfaces.IRPCNode):
310# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
311# self.info = node_info
312# self.rpc = rpc_conn
313#
314 # def get_interface(self, ip: str) -> str:
315 # """Get node external interface for given IP"""
316 # data = self.run("ip a", nolog=True)
317 # curr_iface = None
318 #
319 # for line in data.split("\n"):
320 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
321 # if match1 is not None:
322 # curr_iface = match1.group('name')
323 #
324 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
325 # if match2 is not None:
326 # if match2.group('ip') == ip:
327 # assert curr_iface is not None
328 # return curr_iface
329 #
330 # raise KeyError("Can't found interface for ip {0}".format(ip))
331 #
332 # def get_user(self) -> str:
333 # """"get ssh connection username"""
334 # if self.ssh_conn_url == 'local':
335 # return getpass.getuser()
336 # return self.ssh_cred.user
337 #
338 #
339 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
340 # """Run command on node. Will use rpc connection, if available"""
341 #
342 # if self.rpc_conn is None:
343 # return run_over_ssh(self.ssh_conn, cmd,
344 # stdin_data=stdin_data, timeout=timeout,
345 # nolog=nolog, node=self)
346 # assert not stdin_data
347 # proc_id = self.rpc_conn.cli.spawn(cmd)
348 # exit_code = None
349 # output = ""
350 #
351 # while exit_code is None:
352 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
353 # output += stdout_data + stderr_data
354 #
355 # return exit_code, output
356
357