blob: 32ec58a500b47b36e50760280c7c4c998c3eec39 [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
koder aka kdanilov108ac362017-01-19 20:17:16 +02002import zlib
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02003import time
4import json
5import socket
6import logging
koder aka kdanilov962ee5f2016-12-19 02:40:08 +02007import tempfile
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02008import subprocess
kdanylov aka koder150b2192017-04-01 16:53:01 +03009from typing import Union, cast, Optional, Tuple, Dict
koder aka kdanilov73084622016-11-16 21:51:08 +020010
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030011
kdanylov aka koder150b2192017-04-01 16:53:01 +030012from agent import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020013import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020014
koder aka kdanilov73084622016-11-16 21:51:08 +020015
16from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
17from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030018
19
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020020logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030021
koder aka kdanilov22d134e2016-11-08 11:33:19 +020022
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020023class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020024 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
25 self.conn = conn
26 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030027
koder aka kdanilov22d134e2016-11-08 11:33:19 +020028 def __str__(self) -> str:
koder aka kdanilov108ac362017-01-19 20:17:16 +020029 return self.node_id
30
31 @property
32 def node_id(self) -> str:
33 return self.info.node_id
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020034
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020035 def put_to_file(self, path: Optional[str], content: bytes) -> str:
36 if path is None:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020037 path = self.run("mktemp", nolog=True).strip()
38
39 logger.debug("PUT %s bytes to %s", len(content), path)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020040
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020041 with self.conn.open_sftp() as sftp:
42 with sftp.open(path, "wb") as fd:
43 fd.write(content)
44
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020045 return path
46
koder aka kdanilov73084622016-11-16 21:51:08 +020047 def disconnect(self):
48 self.conn.close()
49
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020050 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020051 if not nolog:
52 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
53
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020054 transport = self.conn.get_transport()
55 session = transport.open_session()
56
57 try:
58 session.set_combine_stderr(True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020059 stime = time.time()
60
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020061 session.exec_command(cmd)
62 session.settimeout(1)
63 session.shutdown_write()
64 output = ""
65
66 while True:
67 try:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020068 ndata = session.recv(1024).decode("utf-8")
69 if not ndata:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020070 break
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020071 output += ndata
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020072 except socket.timeout:
73 pass
74
75 if time.time() - stime > timeout:
76 raise OSError(output + "\nExecution timeout")
77
78 code = session.recv_exit_status()
79 finally:
80 found = False
81
82 if found:
83 session.close()
84
85 if code != 0:
86 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
87 raise OSError(templ.format(self, cmd, code, output))
88
89 return output
90
91
92class LocalHost(ISSHHost):
93 def __str__(self):
94 return "<Local>"
95
96 def get_ip(self) -> str:
97 return 'localhost'
98
koder aka kdanilov962ee5f2016-12-19 02:40:08 +020099 def put_to_file(self, path: Optional[str], content: bytes) -> str:
100 if path is None:
101 fd, path = tempfile.mkstemp(text=False)
102 os.close(fd)
103 else:
104 dir_name = os.path.dirname(path)
105 os.makedirs(dir_name, exist_ok=True)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200106
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200107 with open(path, "wb") as fd2:
108 fd2.write(content)
109
110 return path
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200111
112 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
113 proc = subprocess.Popen(cmd, shell=True,
114 stdin=subprocess.PIPE,
115 stdout=subprocess.PIPE,
116 stderr=subprocess.STDOUT)
117
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200118 stdout_data_b, _ = proc.communicate()
119 stdout_data = stdout_data_b.decode("utf8")
120
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200121 if proc.returncode != 0:
122 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
123 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
124
125 return stdout_data
126
koder aka kdanilov73084622016-11-16 21:51:08 +0200127 def disconnect(self):
128 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200129
koder aka kdanilov73084622016-11-16 21:51:08 +0200130
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200131def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
132 # setup rpc data
133 if agent.__file__.endswith(".pyc"):
134 path = agent.__file__[:-1]
135 else:
136 path = agent.__file__
137
138 master_code = open(path, "rb").read()
139
140 plugins = {} # type: Dict[str, bytes]
141 cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
142 plugins["cli"] = open(cli_path, "rb").read()
143
144 fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
145 plugins["fs"] = open(fs_path, "rb").read()
146
147 return master_code, plugins
148
149
koder aka kdanilov73084622016-11-16 21:51:08 +0200150def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
151 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200152 return LocalHost()
153 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200154 info_c = cast(NodeInfo, info)
155 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200156
157
158class RPCNode(IRPCNode):
159 """Node object"""
160
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200161 def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200162 self.info = info
163 self.conn = conn
164
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200165 def __str__(self) -> str:
koder aka kdanilova732a602017-02-01 20:29:56 +0200166 return "Node({!r})".format(self.info)
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300167
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200168 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300169 return str(self)
170
koder aka kdanilova732a602017-02-01 20:29:56 +0200171 @property
172 def node_id(self) -> str:
173 return self.info.node_id
174
koder aka kdanilov108ac362017-01-19 20:17:16 +0200175 def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200176 logger.debug("GET %s from %s", path, self.info)
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200177 if expanduser:
178 path = self.conn.fs.expanduser(path)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200179 res = self.conn.fs.get_file(path, compress)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200180 logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200181 if compress:
182 res = zlib.decompress(res)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200183 return res
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300184
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200185 def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200186 if not nolog:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200187 logger.debug("Node %s - run %s", self.node_id, cmd)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200188
189 cmd_b = cmd.encode("utf8")
190 proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200191 out = ""
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200192
193 while True:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200194 code, outb, _ = self.conn.cli.get_updates(proc_id)
195 out += outb.decode("utf8")
koder aka kdanilov3af3c332016-12-19 17:12:34 +0200196 if code is not None:
197 break
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200198 time.sleep(check_timeout)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200199
200 if code != 0:
201 templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
koder aka kdanilov108ac362017-01-19 20:17:16 +0200202 raise OSError(templ.format(self.node_id, cmd, code, out))
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200203
204 return out
koder aka kdanilov73084622016-11-16 21:51:08 +0200205
koder aka kdanilov108ac362017-01-19 20:17:16 +0200206 def copy_file(self, local_path: str, remote_path: str = None,
207 expanduser: bool = False,
208 compress: bool = False) -> str:
209
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200210 if expanduser:
211 remote_path = self.conn.fs.expanduser(remote_path)
212
koder aka kdanilovffaf48d2016-12-27 02:25:29 +0200213 data = open(local_path, 'rb').read() # type: bytes
koder aka kdanilov108ac362017-01-19 20:17:16 +0200214 return self.put_to_file(remote_path, data, compress=compress)
koder aka kdanilov73084622016-11-16 21:51:08 +0200215
koder aka kdanilov108ac362017-01-19 20:17:16 +0200216 def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200217 if expanduser:
218 path = self.conn.fs.expanduser(path)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200219 if compress:
220 content = zlib.compress(content)
221 return self.conn.fs.store_file(path, content, compress)
koder aka kdanilov73084622016-11-16 21:51:08 +0200222
koder aka kdanilov7f59d562016-12-26 01:34:23 +0200223 def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
224 if expanduser:
225 path = self.conn.fs.expanduser(path)
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200226 return self.conn.fs.file_stat(path)
koder aka kdanilov73084622016-11-16 21:51:08 +0200227
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200228 def __exit__(self, x, y, z) -> bool:
229 self.disconnect(stop=True)
230 return False
231
232 def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
233 self.conn.server.load_module(name, version, code)
234
235 def disconnect(self, stop: bool = False) -> None:
236 if stop:
koder aka kdanilova732a602017-02-01 20:29:56 +0200237 logger.debug("Stopping RPC server on %s", self.info)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200238 self.conn.server.stop()
239
koder aka kdanilova732a602017-02-01 20:29:56 +0200240 logger.debug("Disconnecting from %s", self.info)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200241 self.conn.disconnect()
242 self.conn = None
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300243
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300244
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200245def get_node_python_27(node: ISSHHost) -> Optional[str]:
246 python_cmd = None # type: Optional[str]
247 try:
248 python_cmd = node.run('which python2.7').strip()
249 except Exception as exc:
250 pass
251
252 if python_cmd is None:
253 try:
254 if '2.7' in node.run('python --version'):
255 python_cmd = node.run('which python').strip()
256 except Exception as exc:
257 pass
258
259 return python_cmd
260
261
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200262def setup_rpc(node: ISSHHost,
263 rpc_server_code: bytes,
264 plugins: Dict[str, bytes] = None,
265 port: int = 0,
266 log_level: str = None) -> IRPCNode:
267
268 logger.debug("Setting up RPC connection to {}".format(node.info))
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200269 python_cmd = get_node_python_27(node)
270 if python_cmd:
271 logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
272 else:
273 logger.error(("Can't find python2.7 on node {}. " +
274 "Install python2.7 and rerun test").format(node.info))
275 raise ValueError("Python not found")
276
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200277 code_file = node.put_to_file(None, rpc_server_code)
koder aka kdanilov73084622016-11-16 21:51:08 +0200278 ip = node.info.ssh_creds.addr.host
279
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200280 log_file = None # type: Optional[str]
281 if log_level:
282 log_file = node.run("mktemp", nolog=True).strip()
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200283 cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
284 cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
kdanylov aka koder45183182017-04-30 23:55:40 +0300285 logger.info("Agent logs for node {} stored remotely in file {}, log level is {}".format(
koder aka kdanilov108ac362017-01-19 20:17:16 +0200286 node.node_id, log_file, log_level))
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200287 else:
koder aka kdanilov23e6bdf2016-12-24 02:18:54 +0200288 cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
289 cmd = cmd.format(python_cmd, code_file, ip, port)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200290
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200291 params_js = node.run(cmd).strip()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200292 params = json.loads(params_js)
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200293
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200294 node.info.params.update(params)
295
koder aka kdanilov73084622016-11-16 21:51:08 +0200296 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200297 rpc_conn = agent.connect((ip, port))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200298
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200299 rpc_node = RPCNode(rpc_conn, node.info)
300 rpc_node.rpc_log_file = log_file
301
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200302 if plugins is not None:
303 try:
304 for name, code in plugins.items():
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200305 rpc_node.upload_plugin(name, code)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200306 except Exception:
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200307 rpc_node.disconnect(True)
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200308 raise
309
koder aka kdanilov962ee5f2016-12-19 02:40:08 +0200310 return rpc_node
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200311
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200312
313 # class RemoteNode(node_interfaces.IRPCNode):
314# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
315# self.info = node_info
316# self.rpc = rpc_conn
317#
318 # def get_interface(self, ip: str) -> str:
319 # """Get node external interface for given IP"""
320 # data = self.run("ip a", nolog=True)
321 # curr_iface = None
322 #
323 # for line in data.split("\n"):
324 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
325 # if match1 is not None:
326 # curr_iface = match1.group('name')
327 #
328 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
329 # if match2 is not None:
330 # if match2.group('ip') == ip:
331 # assert curr_iface is not None
332 # return curr_iface
333 #
334 # raise KeyError("Can't found interface for ip {0}".format(ip))
335 #
336 # def get_user(self) -> str:
337 # """"get ssh connection username"""
338 # if self.ssh_conn_url == 'local':
339 # return getpass.getuser()
340 # return self.ssh_cred.user
341 #
342 #
343 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
344 # """Run command on node. Will use rpc connection, if available"""
345 #
346 # if self.rpc_conn is None:
347 # return run_over_ssh(self.ssh_conn, cmd,
348 # stdin_data=stdin_data, timeout=timeout,
349 # nolog=nolog, node=self)
350 # assert not stdin_data
351 # proc_id = self.rpc_conn.cli.spawn(cmd)
352 # exit_code = None
353 # output = ""
354 #
355 # while exit_code is None:
356 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
357 # output += stdout_data + stderr_data
358 #
359 # return exit_code, output
360
361