blob: d87c7534416f1b32592dbfd37b2755b562f0be65 [file] [log] [blame]
import abc
from typing import Any, Set, Dict
class IRemoteShell(metaclass=abc.ABCMeta):
@abc.abstractmethod
def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
pass
class IHost(IRemoteShell, metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_ip(self) -> str:
pass
@abc.abstractmethod
def __str__(self) -> str:
pass
@abc.abstractmethod
def put_to_file(self, path: str, content: bytes) -> None:
pass
class IRemoteFS(metaclass=abc.ABCMeta):
@abc.abstractmethod
def copy_file(self, local_path: str, remote_path: str = None) -> str:
pass
@abc.abstractmethod
def get_file_content(self, path: str) -> bytes:
pass
@abc.abstractmethod
def put_to_file(self, path:str, content: bytes) -> None:
pass
@abc.abstractmethod
def forward_port(self, ip: str, remote_port: int, local_port: int = None) -> int:
pass
@abc.abstractmethod
def get_interface(self, ip: str) -> str:
pass
@abc.abstractmethod
def stat_file(self, path:str) -> Any:
pass
class IRPC(metaclass=abc.ABCMeta):
pass
class IRemoteNode(IRemoteFS, IRemoteShell, metaclass=abc.ABCMeta):
def __init__(self) -> None:
self.roles = set() # type: Set[str]
self.rpc = None # type: IRPC
self.rpc_params = None # type: Dict[str, Any]
@abc.abstractmethod
def is_connected(self) -> bool:
pass
@abc.abstractmethod
def disconnect(self) -> None:
pass
@abc.abstractmethod
def connect_ssh(self, timeout: int = None) -> None:
pass
@abc.abstractmethod
def get_ip(self) -> str:
pass
@abc.abstractmethod
def get_user(self) -> str:
pass