blob: c224bf42c9e1dbc6f9af1039070a8988b2bb12e7 [file] [log] [blame]
import re
import yaml
import getpass
import logging
from typing import List, Dict, Any
from . import utils
from .common_types import IPAddr
logger = logging.getLogger("wally")
class URIsNamespace:
class ReParts:
user_rr = "[^:]*?"
host_rr = "[^:@]*?"
port_rr = "\\d+"
key_file_rr = "[^:@]*"
passwd_rr = ".*?"
re_dct = ReParts.__dict__
for attr_name, val in re_dct.items():
if attr_name.endswith('_rr'):
new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
setattr(ReParts, attr_name, new_rr)
re_dct = ReParts.__dict__
templs = [
"^{host_rr}$",
"^{host_rr}:{port_rr}$",
"^{host_rr}::{key_file_rr}$",
"^{host_rr}:{port_rr}:{key_file_rr}$",
"^{user_rr}@{host_rr}$",
"^{user_rr}@{host_rr}:{port_rr}$",
"^{user_rr}@{host_rr}::{key_file_rr}$",
"^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
"^{user_rr}:{passwd_rr}@{host_rr}$",
"^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
]
uri_reg_exprs = [] # type: List[str]
for templ in templs:
uri_reg_exprs.append(templ.format(**re_dct))
class ConnCreds(yaml.YAMLObject): # type: ignore
yaml_tag = '!ConnCreds'
def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
key_file: str = None, key: bytes = None) -> None:
self.user = user
self.passwd = passwd
self.addr = IPAddr(host, int(port))
self.key_file = key_file
self.key = key
def __str__(self) -> str:
return "{}@{}:{}".format(self.user, self.addr.host, self.addr.port)
def __repr__(self) -> str:
return str(self)
@classmethod
def to_yaml(cls, dumper: Any, data: 'ConnCreds') -> Any:
dict_representation = {
'user': data.user,
'host': data.addr.host,
'port': data.addr.port,
'passwd': data.passwd,
'key_file': data.key_file
}
return dumper.represent_mapping(data.yaml_tag, dict_representation)
@classmethod
def from_yaml(cls, loader: Any, node: Any) -> 'ConnCreds':
dct = loader.construct_mapping(node)
return cls(**dct)
def parse_ssh_uri(uri: str) -> ConnCreds:
"""Parse ssh connection URL from one of following form
[ssh://]user:passwd@host[:port]
[ssh://][user@]host[:port][:key_file]
"""
if uri.startswith("ssh://"):
uri = uri[len("ssh://"):]
for rr in URIsNamespace.uri_reg_exprs:
rrm = re.match(rr, uri)
if rrm is not None:
params = {"user": getpass.getuser()} # type: Dict[str, str]
params.update(rrm.groupdict())
params['host'] = utils.to_ip(params['host'])
return ConnCreds(**params) # type: ignore
raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))