| import re |
| import getpass |
| from typing import List |
| |
| |
| from .common_types import IPAddr |
| |
| |
| class URIsNamespace: |
| class ReParts: |
| user_rr = "[^:]*?" |
| host_rr = "[^:@]*?" |
| port_rr = "\\d+" |
| key_file_rr = "[^:@]*" |
| passwd_rr = ".*?" |
| |
| re_dct = ReParts.__dict__ |
| |
| for attr_name, val in re_dct.items(): |
| if attr_name.endswith('_rr'): |
| new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val) |
| setattr(ReParts, attr_name, new_rr) |
| |
| re_dct = ReParts.__dict__ |
| |
| templs = [ |
| "^{host_rr}$", |
| "^{host_rr}:{port_rr}$", |
| "^{host_rr}::{key_file_rr}$", |
| "^{host_rr}:{port_rr}:{key_file_rr}$", |
| "^{user_rr}@{host_rr}$", |
| "^{user_rr}@{host_rr}:{port_rr}$", |
| "^{user_rr}@{host_rr}::{key_file_rr}$", |
| "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$", |
| "^{user_rr}:{passwd_rr}@{host_rr}$", |
| "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$", |
| ] |
| |
| uri_reg_exprs = [] # type: List[str] |
| for templ in templs: |
| uri_reg_exprs.append(templ.format(**re_dct)) |
| |
| |
| class ConnCreds: |
| def __init__(self, host: str, user: str, passwd: str = None, port: int = 22, |
| key_file: str = None, key: bytes = None) -> None: |
| self.user = user |
| self.passwd = passwd |
| self.addr = IPAddr(host, port) |
| self.key_file = key_file |
| self.key = key |
| |
| def __str__(self) -> str: |
| return "{}@{}:{}".format(self.user, self.addr.host, self.addr.port) |
| |
| |
| def parse_ssh_uri(uri: str) -> ConnCreds: |
| """Parse ssh connection URL from one of following form |
| [ssh://]user:passwd@host[:port] |
| [ssh://][user@]host[:port][:key_file] |
| """ |
| |
| if uri.startswith("ssh://"): |
| uri = uri[len("ssh://"):] |
| |
| res = ConnCreds("", getpass.getuser()) |
| |
| for rr in URIsNamespace.uri_reg_exprs: |
| rrm = re.match(rr, uri) |
| if rrm is not None: |
| res.__dict__.update(rrm.groupdict()) |
| return res |
| |
| raise ValueError("Can't parse {0!r} as ssh uri value".format(uri)) |
| |
| |