blob: 54a629148fa0cb18d533b6673e4c86ea341314ae [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
2import time
3import json
4import socket
5import logging
6import subprocess
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +02007from typing import Union, cast, Any, Optional, Tuple, Dict, List
koder aka kdanilov73084622016-11-16 21:51:08 +02008
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +03009
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020010import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020011import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012
koder aka kdanilov73084622016-11-16 21:51:08 +020013
14from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
15from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030016
17
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020018logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030019
koder aka kdanilov22d134e2016-11-08 11:33:19 +020020
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020021class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020022 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
23 self.conn = conn
24 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030025
koder aka kdanilov22d134e2016-11-08 11:33:19 +020026 def __str__(self) -> str:
koder aka kdanilov73084622016-11-16 21:51:08 +020027 return self.info.node_id()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020028
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020029 def put_to_file(self, path: Optional[str], content: bytes) -> str:
30 if path is None:
31 path = self.run("mktemp").strip()
32
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020033 with self.conn.open_sftp() as sftp:
34 with sftp.open(path, "wb") as fd:
35 fd.write(content)
36
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020037 return path
38
koder aka kdanilov73084622016-11-16 21:51:08 +020039 def disconnect(self):
40 self.conn.close()
41
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020042 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
43 transport = self.conn.get_transport()
44 session = transport.open_session()
45
46 try:
47 session.set_combine_stderr(True)
48
49 stime = time.time()
50
51 if not nolog:
52 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
53
54 session.exec_command(cmd)
55 session.settimeout(1)
56 session.shutdown_write()
57 output = ""
58
59 while True:
60 try:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020061 ndata = session.recv(1024).decode("utf-8")
62 if not ndata:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020063 break
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +020064 output += ndata
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020065 except socket.timeout:
66 pass
67
68 if time.time() - stime > timeout:
69 raise OSError(output + "\nExecution timeout")
70
71 code = session.recv_exit_status()
72 finally:
73 found = False
74
75 if found:
76 session.close()
77
78 if code != 0:
79 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
80 raise OSError(templ.format(self, cmd, code, output))
81
82 return output
83
84
85class LocalHost(ISSHHost):
86 def __str__(self):
87 return "<Local>"
88
89 def get_ip(self) -> str:
90 return 'localhost'
91
92 def put_to_file(self, path: str, content: bytes) -> None:
93 dir_name = os.path.dirname(path)
94 os.makedirs(dir_name, exist_ok=True)
95
96 with open(path, "wb") as fd:
97 fd.write(content)
98
99 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
100 proc = subprocess.Popen(cmd, shell=True,
101 stdin=subprocess.PIPE,
102 stdout=subprocess.PIPE,
103 stderr=subprocess.STDOUT)
104
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200105 stdout_data_b, _ = proc.communicate()
106 stdout_data = stdout_data_b.decode("utf8")
107
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200108 if proc.returncode != 0:
109 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
110 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
111
112 return stdout_data
113
koder aka kdanilov73084622016-11-16 21:51:08 +0200114 def disconnect(self):
115 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200116
koder aka kdanilov73084622016-11-16 21:51:08 +0200117
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200118def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
119 # setup rpc data
120 if agent.__file__.endswith(".pyc"):
121 path = agent.__file__[:-1]
122 else:
123 path = agent.__file__
124
125 master_code = open(path, "rb").read()
126
127 plugins = {} # type: Dict[str, bytes]
128 cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
129 plugins["cli"] = open(cli_path, "rb").read()
130
131 fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
132 plugins["fs"] = open(fs_path, "rb").read()
133
134 return master_code, plugins
135
136
koder aka kdanilov73084622016-11-16 21:51:08 +0200137def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
138 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200139 return LocalHost()
140 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200141 info_c = cast(NodeInfo, info)
142 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200143
144
145class RPCNode(IRPCNode):
146 """Node object"""
147
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200148 def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200149 self.info = info
150 self.conn = conn
151
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200152 def __str__(self) -> str:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200153 return "Node(url={!r}, roles={!r})".format(self.info.ssh_creds, ",".join(self.info.roles))
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300154
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200155 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300156 return str(self)
157
koder aka kdanilov73084622016-11-16 21:51:08 +0200158 def get_file_content(self, path: str) -> bytes:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300159 raise NotImplementedError()
160
koder aka kdanilov73084622016-11-16 21:51:08 +0200161 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200162 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200163
164 def copy_file(self, local_path: str, remote_path: str = None) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200165 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200166
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200167 def put_to_file(self, path: Optional[str], content: bytes) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200168 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200169
170 def get_interface(self, ip: str) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200171 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200172
173 def stat_file(self, path: str) -> Any:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200174 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200175
176 def disconnect(self) -> str:
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200177 self.conn.disconnect()
178 self.conn = None
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300179
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300180
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200181def setup_rpc(node: ISSHHost, rpc_server_code: bytes, plugins: Dict[str, bytes] = None, port: int = 0) -> IRPCNode:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200182 log_file = node.run("mktemp").strip()
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200183 code_file = node.put_to_file(None, rpc_server_code)
koder aka kdanilov73084622016-11-16 21:51:08 +0200184 ip = node.info.ssh_creds.addr.host
185
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200186 cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
187 "--show-settings --stdout-file={out_file}"
188 cmd = cmd.format(code_file=code_file, listen_ip=ip, out_file=log_file, port=port)
189 params_js = node.run(cmd).strip()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200190 params = json.loads(params_js)
191 params['log_file'] = log_file
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200192 node.info.params.update(params)
193
koder aka kdanilov73084622016-11-16 21:51:08 +0200194 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200195 rpc_conn = agent.connect((ip, port))
koder aka kdanilove7e1a4d2016-12-17 20:29:52 +0200196
197 if plugins is not None:
198 try:
199 for name, code in plugins.items():
200 rpc_conn.server.load_module(name, None, code)
201 except Exception:
202 rpc_conn.server.stop()
203 rpc_conn.disconnect()
204 raise
205
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200206 return RPCNode(rpc_conn, node.info)
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200207
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200208
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200209
210 # class RemoteNode(node_interfaces.IRPCNode):
211# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
212# self.info = node_info
213# self.rpc = rpc_conn
214#
215 # def get_interface(self, ip: str) -> str:
216 # """Get node external interface for given IP"""
217 # data = self.run("ip a", nolog=True)
218 # curr_iface = None
219 #
220 # for line in data.split("\n"):
221 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
222 # if match1 is not None:
223 # curr_iface = match1.group('name')
224 #
225 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
226 # if match2 is not None:
227 # if match2.group('ip') == ip:
228 # assert curr_iface is not None
229 # return curr_iface
230 #
231 # raise KeyError("Can't found interface for ip {0}".format(ip))
232 #
233 # def get_user(self) -> str:
234 # """"get ssh connection username"""
235 # if self.ssh_conn_url == 'local':
236 # return getpass.getuser()
237 # return self.ssh_cred.user
238 #
239 #
240 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
241 # """Run command on node. Will use rpc connection, if available"""
242 #
243 # if self.rpc_conn is None:
244 # return run_over_ssh(self.ssh_conn, cmd,
245 # stdin_data=stdin_data, timeout=timeout,
246 # nolog=nolog, node=self)
247 # assert not stdin_data
248 # proc_id = self.rpc_conn.cli.spawn(cmd)
249 # exit_code = None
250 # output = ""
251 #
252 # while exit_code is None:
253 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
254 # output += stdout_data + stderr_data
255 #
256 # return exit_code, output
257
258