Add stage base class, refactor discovery, etc
diff --git a/wally/storage.py b/wally/storage.py
index 05e4259..540da88 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -4,30 +4,24 @@
import os
import abc
-from typing import Any, Iterable, TypeVar, Type, IO, Tuple, cast, List
+import array
+from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
+
+
+import yaml
+try:
+ from yaml import CLoader as Loader, CDumper as Dumper # type: ignore
+except ImportError:
+ from yaml import Loader, Dumper # type: ignore
class IStorable(metaclass=abc.ABCMeta):
"""Interface for type, which can be stored"""
- @abc.abstractmethod
- def __getstate__(self) -> Any:
- pass
- @abc.abstractmethod
- def __setstate__(self, Any):
- pass
-
-
-# all builtin types can be stored
-IStorable.register(list) # type: ignore
-IStorable.register(dict) # type: ignore
-IStorable.register(tuple) # type: ignore
-IStorable.register(set) # type: ignore
-IStorable.register(None) # type: ignore
-IStorable.register(int) # type: ignore
-IStorable.register(str) # type: ignore
-IStorable.register(bytes) # type: ignore
-IStorable.register(bool) # type: ignore
+basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
+for btype in basic_types:
+ # pylint: disable=E1101
+ IStorable.register(btype) # type: ignore
ObjClass = TypeVar('ObjClass')
@@ -54,11 +48,15 @@
pass
@abc.abstractmethod
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
pass
@abc.abstractmethod
- def get_stream(self, path: str) -> IO:
+ def get_stream(self, path: str, mode: str = "rb+") -> IO:
+ pass
+
+ @abc.abstractmethod
+ def sub_storage(self, path: str) -> 'ISimpleStorage':
pass
@@ -78,14 +76,18 @@
def __init__(self, root_path: str, existing: bool) -> None:
self.root_path = root_path
+ self.existing = existing
if existing:
if not os.path.isdir(self.root_path):
- raise ValueError("No storage found at {!r}".format(root_path))
+ raise IOError("No storage found at {!r}".format(root_path))
+
+ def j(self, path: str) -> str:
+ return os.path.join(self.root_path, path)
def __setitem__(self, path: str, value: bytes) -> None:
- path = os.path.join(self.root_path, path)
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "wb") as fd:
+ jpath = self.j(path)
+ os.makedirs(os.path.dirname(jpath), exist_ok=True)
+ with open(jpath, "wb") as fd:
fd.write(value)
def __delitem__(self, path: str) -> None:
@@ -95,32 +97,53 @@
pass
def __getitem__(self, path: str) -> bytes:
- path = os.path.join(self.root_path, path)
- with open(path, "rb") as fd:
+ with open(self.j(path), "rb") as fd:
return fd.read()
def __contains__(self, path: str) -> bool:
- path = os.path.join(self.root_path, path)
- return os.path.exists(path)
+ return os.path.exists(self.j(path))
- def list(self, path: str) -> Iterable[Tuple[bool, str]]:
- path = os.path.join(self.root_path, path)
- for entry in os.scandir(path):
+ def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
+ jpath = self.j(path)
+ for entry in os.scandir(jpath):
if not entry.name in ('..', '.'):
yield entry.is_file(), entry.name
- def get_stream(self, path: str, mode: str = "rb") -> IO:
- path = os.path.join(self.root_path, path)
- return open(path, mode)
+ def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+ jpath = self.j(path)
+
+ if "cb" == mode:
+ create_on_fail = True
+ mode = "rb+"
+ else:
+ create_on_fail = False
+
+ try:
+ fd = open(jpath, mode)
+ except IOError:
+ if not create_on_fail:
+ raise
+ fd = open(jpath, "wb")
+
+ return cast(IO[bytes], fd)
+
+ def sub_storage(self, path: str) -> 'FSStorage':
+ return self.__class__(self.j(path), self.existing)
class YAMLSerializer(ISerializer):
"""Serialize data to yaml"""
- def pack(self, value: IStorable) -> bytes:
- raise NotImplementedError()
+ def pack(self, value: Any) -> bytes:
+ if type(value) not in basic_types:
+ for name, val in value.__dict__.items():
+ if type(val) not in basic_types:
+ raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
+ " basic types accepted as attributes").format(value, name, val, type(val)))
+ value = value.__dict__
+ return yaml.dump(value, Dumper=Dumper, encoding="utf8")
def unpack(self, data: bytes) -> IStorable:
- raise NotImplementedError()
+ return yaml.load(data, Loader=Loader)
class Storage:
@@ -129,32 +152,65 @@
self.storage = storage
self.serializer = serializer
- def __setitem__(self, path: str, value: IStorable) -> None:
- self.storage[path] = self.serializer.pack(value)
+ def sub_storage(self, *path: str) -> 'Storage':
+ return self.__class__(self.storage.sub_storage("/".join(path)), self.serializer)
- def __getitem__(self, path: str) -> IStorable:
+ def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
+ self.storage[path] = self.serializer.pack(cast(IStorable, value))
+
+ def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
return self.serializer.unpack(self.storage[path])
- def __delitem__(self, path: str) -> None:
+ def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
del self.storage[path]
- def __contains__(self, path: str) -> bool:
+ def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
+ if not isinstance(path, str):
+ path = "/".join(path)
return path in self.storage
- def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+ def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
return self.storage.list("/".join(path))
- def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
+ def set_array(self, value: array.array, *path: str) -> None:
+ with self.get_stream("/".join(path), "wb") as fd:
+ value.tofile(fd) # type: ignore
+
+ def get_array(self, typecode: str, *path: str) -> array.array:
+ res = array.array(typecode)
+ path_s = "/".join(path)
+ with self.get_stream(path_s, "rb") as fd:
+ fd.seek(0, os.SEEK_END)
+ size = fd.tell()
+ fd.seek(0, os.SEEK_SET)
+ assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
+ .format(path_s, typecode)
+ res.fromfile(fd, size // res.itemsize) # type: ignore
+ return res
+
+ def append(self, value: array.array, *path: str) -> None:
+ with self.get_stream("/".join(path), "cb") as fd:
+ fd.seek(0, os.SEEK_END)
+ value.tofile(fd) # type: ignore
+
+ def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
+ "Internal function, used to construct user type from raw unpacked value"
if obj_class in (int, str, dict, list, None):
- if not isinstance(raw_val, obj_class):
- raise ValueError("Can't load path {!r} into type {}. Real type is {}"
- .format(path, obj_class, type(raw_val)))
- return cast(ObjClass, raw_val)
+ raise ValueError("Can't load into build-in value - {!r} into type {}")
if not isinstance(raw_val, dict):
raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
- if not all(isinstance(str, key) for key in raw_val.keys):
+ if not all(isinstance(key, str) for key in raw_val.keys()):
raise ValueError("Can't load path {!r} into python type.".format(path) +
"Raw not all keys in raw value is strings")
@@ -170,19 +226,25 @@
def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
path_s = "/".join(path)
- return self.construct(path_s, self[path_s], obj_class)
+ return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
- def get_stream(self, *path: str) -> IO:
- return self.storage.get_stream("/".join(path))
+ def get_stream(self, path: str, mode: str = "r") -> IO:
+ return self.storage.get_stream(path, mode)
- def get(self, path: str, default: Any = None) -> Any:
+ def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
+ if not isinstance(path, str):
+ path = "/".join(path)
+
try:
return self[path]
- except KeyError:
+ except Exception:
return default
- def append(self, path: str, data: List):
- raise NotImplemented()
+ def __enter__(self) -> 'Storage':
+ return self
+
+ def __exit__(self, x: Any, y: Any, z: Any) -> None:
+ return
def make_storage(url: str, existing: bool = False) -> Storage: