blob: fae787914a3754724d1819a67205cfd381bb4441 [file] [log] [blame]
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +02001import os
2import time
3import json
4import socket
5import logging
6import subprocess
koder aka kdanilov73084622016-11-16 21:51:08 +02007from typing import Union, cast, Any
8
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +03009
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020010import agent
koder aka kdanilov73084622016-11-16 21:51:08 +020011import paramiko
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020012
koder aka kdanilov73084622016-11-16 21:51:08 +020013
14from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
15from .ssh import connect as ssh_connect
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030016
17
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020018logger = logging.getLogger("wally")
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030019
koder aka kdanilov22d134e2016-11-08 11:33:19 +020020
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020021class SSHHost(ISSHHost):
koder aka kdanilov73084622016-11-16 21:51:08 +020022 def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
23 self.conn = conn
24 self.info = info
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +030025
koder aka kdanilov22d134e2016-11-08 11:33:19 +020026 def __str__(self) -> str:
koder aka kdanilov73084622016-11-16 21:51:08 +020027 return self.info.node_id()
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020028
29 def put_to_file(self, path: str, content: bytes) -> None:
30 with self.conn.open_sftp() as sftp:
31 with sftp.open(path, "wb") as fd:
32 fd.write(content)
33
koder aka kdanilov73084622016-11-16 21:51:08 +020034 def disconnect(self):
35 self.conn.close()
36
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +020037 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
38 transport = self.conn.get_transport()
39 session = transport.open_session()
40
41 try:
42 session.set_combine_stderr(True)
43
44 stime = time.time()
45
46 if not nolog:
47 logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
48
49 session.exec_command(cmd)
50 session.settimeout(1)
51 session.shutdown_write()
52 output = ""
53
54 while True:
55 try:
56 ndata = session.recv(1024)
57 output += ndata
58 if "" == ndata:
59 break
60 except socket.timeout:
61 pass
62
63 if time.time() - stime > timeout:
64 raise OSError(output + "\nExecution timeout")
65
66 code = session.recv_exit_status()
67 finally:
68 found = False
69
70 if found:
71 session.close()
72
73 if code != 0:
74 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
75 raise OSError(templ.format(self, cmd, code, output))
76
77 return output
78
79
80class LocalHost(ISSHHost):
81 def __str__(self):
82 return "<Local>"
83
84 def get_ip(self) -> str:
85 return 'localhost'
86
87 def put_to_file(self, path: str, content: bytes) -> None:
88 dir_name = os.path.dirname(path)
89 os.makedirs(dir_name, exist_ok=True)
90
91 with open(path, "wb") as fd:
92 fd.write(content)
93
94 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
95 proc = subprocess.Popen(cmd, shell=True,
96 stdin=subprocess.PIPE,
97 stdout=subprocess.PIPE,
98 stderr=subprocess.STDOUT)
99
100 stdout_data, _ = proc.communicate()
101 if proc.returncode != 0:
102 templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
103 raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
104
105 return stdout_data
106
koder aka kdanilov73084622016-11-16 21:51:08 +0200107 def disconnect(self):
108 pass
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200109
koder aka kdanilov73084622016-11-16 21:51:08 +0200110
111def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
112 if info == 'local':
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200113 return LocalHost()
114 else:
koder aka kdanilov73084622016-11-16 21:51:08 +0200115 info_c = cast(NodeInfo, info)
116 return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200117
118
119class RPCNode(IRPCNode):
120 """Node object"""
121
122 def __init__(self, conn: agent.Client, info: NodeInfo) -> None:
123 self.info = info
124 self.conn = conn
125
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200126 def __str__(self) -> str:
koder aka kdanilov73084622016-11-16 21:51:08 +0200127 return "<Node: url={!s} roles={!r} hops=/>".format(self.info.ssh_creds, ",".join(self.info.roles))
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300128
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200129 def __repr__(self) -> str:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300130 return str(self)
131
koder aka kdanilov73084622016-11-16 21:51:08 +0200132 def get_file_content(self, path: str) -> bytes:
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300133 raise NotImplementedError()
134
koder aka kdanilov73084622016-11-16 21:51:08 +0200135 def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200136 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200137
138 def copy_file(self, local_path: str, remote_path: str = None) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200139 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200140
141 def put_to_file(self, path: str, content: bytes) -> None:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200142 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200143
144 def get_interface(self, ip: str) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200145 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200146
147 def stat_file(self, path: str) -> Any:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200148 raise NotImplementedError()
koder aka kdanilov73084622016-11-16 21:51:08 +0200149
150 def disconnect(self) -> str:
koder aka kdanilov39e449e2016-12-17 15:15:26 +0200151 raise NotImplementedError()
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300152
koder aka kdanilov3b4da8b2016-10-17 00:17:53 +0300153
koder aka kdanilov73084622016-11-16 21:51:08 +0200154def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200155 code_file = node.run("mktemp").strip()
156 log_file = node.run("mktemp").strip()
157 node.put_to_file(code_file, rpc_server_code)
158 cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
159 "--show-settings --stdout-file={out_file}"
koder aka kdanilov73084622016-11-16 21:51:08 +0200160
161 ip = node.info.ssh_creds.addr.host
162
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200163 params_js = node.run(cmd.format(code_file=code_file,
koder aka kdanilov73084622016-11-16 21:51:08 +0200164 listen_addr=ip,
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200165 out_file=log_file,
166 port=port)).strip()
167 params = json.loads(params_js)
168 params['log_file'] = log_file
koder aka kdanilov73084622016-11-16 21:51:08 +0200169 port = int(params['addr'].split(":")[1])
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200170 rpc_conn = agent.connect((ip, port))
171 node.info.params.update(params)
172 return RPCNode(rpc_conn, node.info)
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200173
koder aka kdanilov22d134e2016-11-08 11:33:19 +0200174
koder aka kdanilov3d2bc4f2016-11-12 18:31:18 +0200175
176 # class RemoteNode(node_interfaces.IRPCNode):
177# def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
178# self.info = node_info
179# self.rpc = rpc_conn
180#
181 # def get_interface(self, ip: str) -> str:
182 # """Get node external interface for given IP"""
183 # data = self.run("ip a", nolog=True)
184 # curr_iface = None
185 #
186 # for line in data.split("\n"):
187 # match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
188 # if match1 is not None:
189 # curr_iface = match1.group('name')
190 #
191 # match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
192 # if match2 is not None:
193 # if match2.group('ip') == ip:
194 # assert curr_iface is not None
195 # return curr_iface
196 #
197 # raise KeyError("Can't found interface for ip {0}".format(ip))
198 #
199 # def get_user(self) -> str:
200 # """"get ssh connection username"""
201 # if self.ssh_conn_url == 'local':
202 # return getpass.getuser()
203 # return self.ssh_cred.user
204 #
205 #
206 # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
207 # """Run command on node. Will use rpc connection, if available"""
208 #
209 # if self.rpc_conn is None:
210 # return run_over_ssh(self.ssh_conn, cmd,
211 # stdin_data=stdin_data, timeout=timeout,
212 # nolog=nolog, node=self)
213 # assert not stdin_data
214 # proc_id = self.rpc_conn.cli.spawn(cmd)
215 # exit_code = None
216 # output = ""
217 #
218 # while exit_code is None:
219 # exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
220 # output += stdout_data + stderr_data
221 #
222 # return exit_code, output
223
224