blob: 6624bdc3b48ab2b1ce6343eb8ba7a6e8f753e8e0 [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
2import time
3import json
4import socket
5import logging
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02006import tempfile
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02007import subprocess
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02008from typing import Union, cast, Any, Optional, Tuple, Dict
koder aka kdanilov73084622016-11-16 21:51:08 +02009
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030010
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020011import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020012import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020013
koder aka kdanilov73084622016-11-16 21:51:08 +020014
15from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
16from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030017
18
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020019logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030020
koder aka kdanilov22d134e2016-11-08 11:33:19 +020021
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020022class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020023 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
24 self.conn = conn
25 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030026
koder aka kdanilov22d134e2016-11-08 11:33:19 +020027 def __str__(self) -> str:
koder aka kdanilov73084622016-11-16 21:51:08 +020028 return self.info.node_id()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020029
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020030 def put_to_file(self, path: Optional[str], content: bytes) -> str:
31 if path is None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020032 path = self.run("mktemp", nolog=True).strip()
33
34 logger.debug("PUT %s bytes to %s", len(content), path)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020035
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020036 with self.conn.open_sftp() as sftp:
37 with sftp.open(path, "wb") as fd:
38 fd.write(content)
39
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020040 return path
41
koder aka kdanilov73084622016-11-16 21:51:08 +020042 def disconnect(self):
43 self.conn.close()
44
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020045 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020046 if not nolog:
47 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
48
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020049 transport = self.conn.get_transport()
50 session = transport.open_session()
51
52 try:
53 session.set_combine_stderr(True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020054 stime = time.time()
55
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020056 session.exec_command(cmd)
57 session.settimeout(1)
58 session.shutdown_write()
59 output = ""
60
61 while True:
62 try:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020063 ndata = session.recv(1024).decode("utf-8")
64 if not ndata:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020065 break
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020066 output += ndata
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020067 except socket.timeout:
68 pass
69
70 if time.time() - stime > timeout:
71 raise OSError(output + "\nExecution timeout")
72
73 code = session.recv_exit_status()
74 finally:
75 found = False
76
77 if found:
78 session.close()
79
80 if code != 0:
81 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
82 raise OSError(templ.format(self, cmd, code, output))
83
84 return output
85
86
87class LocalHost(ISSHHost):
88 def __str__(self):
89 return "<Local>"
90
91 def get_ip(self) -> str:
92 return 'localhost'
93
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020094 def put_to_file(self, path: Optional[str], content: bytes) -> str:
95 if path is None:
96 fd, path = tempfile.mkstemp(text=False)
97 os.close(fd)
98 else:
99 dir_name = os.path.dirname(path)
100 os.makedirs(dir_name, exist_ok=True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200101
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200102 with open(path, "wb") as fd2:
103 fd2.write(content)
104
105 return path
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200106
107 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
108 proc = subprocess.Popen(cmd, shell=True,
109 stdin=subprocess.PIPE,
110 stdout=subprocess.PIPE,
111 stderr=subprocess.STDOUT)
112
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200113 stdout_data_b, _ = proc.communicate()
114 stdout_data = stdout_data_b.decode("utf8")
115
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200116 if proc.returncode != 0:
117 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
118 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
119
120 return stdout_data
121
koder aka kdanilov73084622016-11-16 21:51:08 +0200122 def disconnect(self):
123 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200124
koder aka kdanilov73084622016-11-16 21:51:08 +0200125
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200126def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
127 # setup rpc data
128 if agent.__file__.endswith(".pyc"):
129 path = agent.__file__[:-1]
130 else:
131 path = agent.__file__
132
133 master_code = open(path, "rb").read()
134
135 plugins = {} # type: Dict[str, bytes]
136 cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
137 plugins["cli"] = open(cli_path, "rb").read()
138
139 fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
140 plugins["fs"] = open(fs_path, "rb").read()
141
142 return master_code, plugins
143
144
koder aka kdanilov73084622016-11-16 21:51:08 +0200145def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
146 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200147 return LocalHost()
148 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200149 info_c = cast(NodeInfo, info)
150 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200151
152
153class RPCNode(IRPCNode):
154 """Node object"""
155
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200156 def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200157 self.info = info
158 self.conn = conn
159
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200160 def __str__(self) -> str:
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200161 return "Node({!r})".format(self.info.node_id())
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300162
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200163 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300164 return str(self)
165
koder aka kdanilov73084622016-11-16 21:51:08 +0200166 def get_file_content(self, path: str) -> bytes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200167 logger.debug("GET %s from %s", path, self.info)
168 res = self.conn.fs.get_file(self.conn.fs.expanduser(path))
169 logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200170 return res
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300171
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200172 def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200173 if not nolog:
174 logger.debug("Node %s - run %s", self.info.node_id(), cmd)
175
176 cmd_b = cmd.encode("utf8")
177 proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200178 out = ""
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200179
180 while True:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200181 code, outb, _ = self.conn.cli.get_updates(proc_id)
182 out += outb.decode("utf8")
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200183 if code is not None:
184 break
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200185 time.sleep(check_timeout)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200186
187 if code != 0:
188 templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
189 raise OSError(templ.format(self.info.node_id(), cmd, code, out))
190
191 return out
koder aka kdanilov73084622016-11-16 21:51:08 +0200192
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200193 def copy_file(self, local_path: str, remote_path: str = None) -> str:
koder aka kdanilovbbbe1dc2016-12-20 01:19:56 +0200194 data = open(local_path, 'rb').read()
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200195 return self.put_to_file(remote_path, data)
koder aka kdanilov73084622016-11-16 21:51:08 +0200196
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200197 def put_to_file(self, path: Optional[str], content: bytes) -> str:
198 return self.conn.fs.store_file(path, content)
koder aka kdanilov73084622016-11-16 21:51:08 +0200199
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200200 def stat_file(self, path: str) -> Dict[str, int]:
201 return self.conn.fs.file_stat(path)
koder aka kdanilov73084622016-11-16 21:51:08 +0200202
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200203 def __exit__(self, x, y, z) -> bool:
204 self.disconnect(stop=True)
205 return False
206
207 def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
208 self.conn.server.load_module(name, version, code)
209
210 def disconnect(self, stop: bool = False) -> None:
211 if stop:
212 logger.debug("Stopping RPC server on %s", self.info.node_id())
213 self.conn.server.stop()
214
215 logger.debug("Disconnecting from %s", self.info.node_id())
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200216 self.conn.disconnect()
217 self.conn = None
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300218
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300219
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200220def get_node_python_27(node: ISSHHost) -> Optional[str]:
221 python_cmd = None # type: Optional[str]
222 try:
223 python_cmd = node.run('which python2.7').strip()
224 except Exception as exc:
225 pass
226
227 if python_cmd is None:
228 try:
229 if '2.7' in node.run('python --version'):
230 python_cmd = node.run('which python').strip()
231 except Exception as exc:
232 pass
233
234 return python_cmd
235
236
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200237def setup_rpc(node: ISSHHost,
238 rpc_server_code: bytes,
239 plugins: Dict[str, bytes] = None,
240 port: int = 0,
241 log_level: str = None) -> IRPCNode:
242
243 logger.debug("Setting up RPC connection to {}".format(node.info))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200244 python_cmd = get_node_python_27(node)
245 if python_cmd:
246 logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
247 else:
248 logger.error(("Can't find python2.7 on node {}. " +
249 "Install python2.7 and rerun test").format(node.info))
250 raise ValueError("Python not found")
251
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200252 code_file = node.put_to_file(None, rpc_server_code)
koder aka kdanilov73084622016-11-16 21:51:08 +0200253 ip = node.info.ssh_creds.addr.host
254
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200255 log_file = None # type: Optional[str]
256 if log_level:
257 log_file = node.run("mktemp", nolog=True).strip()
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200258 cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
259 cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200260 logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
261 node.info.node_id(), log_file, log_level))
262 else:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200263 cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
264 cmd = cmd.format(python_cmd, code_file, ip, port)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200265
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200266 params_js = node.run(cmd).strip()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200267 params = json.loads(params_js)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200268
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200269 node.info.params.update(params)
270
koder aka kdanilov73084622016-11-16 21:51:08 +0200271 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200272 rpc_conn = agent.connect((ip, port))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200273
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200274 rpc_node = RPCNode(rpc_conn, node.info)
275 rpc_node.rpc_log_file = log_file
276
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200277 if plugins is not None:
278 try:
279 for name, code in plugins.items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200280 rpc_node.upload_plugin(name, code)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200281 except Exception:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200282 rpc_node.disconnect(True)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200283 raise
284
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200285 return rpc_node
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200286
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200287
288 # class RemoteNode(node_interfaces.IRPCNode):
289# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
290# self.info = node_info
291# self.rpc = rpc_conn
292#
293 # def get_interface(self, ip: str) -> str:
294 # """Get node external interface for given IP"""
295 # data = self.run("ip a", nolog=True)
296 # curr_iface = None
297 #
298 # for line in data.split("\n"):
299 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
300 # if match1 is not None:
301 # curr_iface = match1.group('name')
302 #
303 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
304 # if match2 is not None:
305 # if match2.group('ip') == ip:
306 # assert curr_iface is not None
307 # return curr_iface
308 #
309 # raise KeyError("Can't found interface for ip {0}".format(ip))
310 #
311 # def get_user(self) -> str:
312 # """"get ssh connection username"""
313 # if self.ssh_conn_url == 'local':
314 # return getpass.getuser()
315 # return self.ssh_cred.user
316 #
317 #
318 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
319 # """Run command on node. Will use rpc connection, if available"""
320 #
321 # if self.rpc_conn is None:
322 # return run_over_ssh(self.ssh_conn, cmd,
323 # stdin_data=stdin_data, timeout=timeout,
324 # nolog=nolog, node=self)
325 # assert not stdin_data
326 # proc_id = self.rpc_conn.cli.spawn(cmd)
327 # exit_code = None
328 # output = ""
329 #
330 # while exit_code is None:
331 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
332 # output += stdout_data + stderr_data
333 #
334 # return exit_code, output
335
336