sensors refactored
diff --git a/wally/ceph.py b/wally/ceph.py
index cbb4740..592d595 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -28,7 +28,11 @@
"""Get set of osd's ip"""
data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
- jdata = json.loads(data)
+ try:
+ jdata = json.loads(data)
+ except:
+ open("/tmp/ceph-out.json", "w").write(data)
+ raise
ips = {} # type: Dict[IP, List[OSDInfo]]
first_error = True
for osd_data in jdata["osds"]:
@@ -56,6 +60,7 @@
osd_data_path = line.split("=")[1].strip()
if osd_data_path is None or osd_journal_path is None:
+ open("/tmp/ceph-out.json", "w").write(osd_cfg)
logger.error("Can't detect osd %s journal or storage path", osd_id)
raise StopTestError()
diff --git a/wally/node.py b/wally/node.py
index 4d2dda8..9f4250b 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -175,11 +175,13 @@
cmd_b = cmd.encode("utf8")
proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
- code = None
out = ""
- while code is None:
+
+ while True:
code, outb, _ = self.conn.cli.get_updates(proc_id)
out += outb.decode("utf8")
+ if code is not None:
+ break
time.sleep(0.01)
if code != 0:
diff --git a/wally/run_test.py b/wally/run_test.py
index 7baac35..c72e456 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -84,7 +84,7 @@
path = "rpc_logs/" + nid
node.conn.server.flush_logs()
log = node.get_file_content(node.rpc_log_file)
- ctx.storage[path] = log.decode("utf8")
+ ctx.storage.store_raw(log, path)
logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
with ctx.get_pool() as pool:
diff --git a/wally/sensors.py b/wally/sensors.py
index 4f00a3e..eef7864 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,5 +1,6 @@
import array
import logging
+import collections
from typing import List, Dict, Tuple
from . import utils
@@ -78,17 +79,9 @@
else:
func = node.conn.sensors.get_updates
- data, collected_at_b = func() # type: Dict[Tuple[bytes, bytes], List[int]], List[float]
-
- collected_at = array.array('f')
- collected_at.frombytes(collected_at_b)
-
- mstore = ctx.storage.sub_storage("metric", node_id)
- for (source_name, sensor_name), values_b in data.items():
- values = array.array('Q')
- values.frombytes(values_b)
- mstore.append(values, source_name, sensor_name.decode("utf8"))
- mstore.append(collected_at, "collected_at")
+ # TODO: data is unpacked/repacked here with no reason
+ for path, value in sensors_rpc_plugin.unpack_rpc_updates(func()):
+ ctx.storage.append(value, "metric", node_id, path)
class CollectSensorsStage(Stage):
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index 71a24ac..1c508ee 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,4 +1,5 @@
import os
+import sys
import json
import time
import array
@@ -6,6 +7,7 @@
import threading
import traceback
import subprocess
+import collections
mod_name = "sensors"
@@ -16,35 +18,81 @@
SensorsMap = {}
+class Sensor(object):
+ def __init__(self, params, allowed=None, disallowed=None):
+ self.params = params
+ self.allowed = allowed
+ self.disallowed = disallowed
+ self.allowed_names = set()
+
+ def add_data(self, device, name, value):
+ pass
+
+ def collect(self):
+ pass
+
+ def get_updates(self):
+ pass
+
+ @classmethod
+ def unpack_results(cls, device, metrics, data):
+ pass
+
+
+class ArraysSensor(Sensor):
+ typecode = 'L'
+
+ def __init__(self, params, allowed=None, disallowed=None):
+ Sensor.__init__(self, params, allowed, disallowed)
+ self.data = collections.defaultdict(lambda: array.array(self.typecode))
+
+ def add_data(self, device, name, value):
+ self.data[(device, name)].append(value)
+
+ def get_updates(self):
+ res = self.data
+ self.data = collections.defaultdict(lambda: array.array(self.typecode))
+ packed = {
+ name: arr.typecode + arr.tostring()
+ for name, arr in res.items()
+ }
+ return packed
+
+ @classmethod
+ def unpack_results(cls, device, metrics, packed):
+ arr = array.array(chr(packed[0]))
+ if sys.version_info >= (3, 0, 0):
+ arr.frombytes(packed[1:])
+ else:
+ arr.fromstring(packed[1:])
+ return arr
+
+ def is_dev_accepted(self, name):
+ dev_ok = True
+
+ if self.disallowed is not None:
+ dev_ok = all(not name.startswith(prefix) for prefix in self.disallowed)
+
+ if dev_ok and self.allowed is not None:
+ dev_ok = any(name.startswith(prefix) for prefix in self.allowed)
+
+ return dev_ok
+
+
def provides(name):
- def closure(func):
- SensorsMap[name] = func
- return func
+ def closure(cls):
+ SensorsMap[name] = cls
+ return cls
return closure
-def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
- dev_ok = True
-
- if disallowed_prefixes is not None:
- dev_ok = all(not name.startswith(prefix)
- for prefix in disallowed_prefixes)
-
- if dev_ok and allowed_prefixes is not None:
- dev_ok = any(name.startswith(prefix)
- for prefix in allowed_prefixes)
-
- return dev_ok
-
-
def get_pid_list(disallowed_prefixes, allowed_prefixes):
"""Return pid list from list of pids and names"""
# exceptions
disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
if allowed_prefixes is None:
# if nothing setted - all ps will be returned except setted
- result = [pid
- for pid in os.listdir('/proc')
+ result = [pid for pid in os.listdir('/proc')
if pid.isdigit() and pid not in disallowed]
else:
result = []
@@ -72,93 +120,104 @@
return "no_such_process"
-# 1 - major number
-# 2 - minor mumber
-# 3 - device name
-# 4 - reads completed successfully
-# 5 - reads merged
-# 6 - sectors read
-# 7 - time spent reading (ms)
-# 8 - writes completed
-# 9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
-
-io_values_pos = [
- (3, 'reads_completed', True),
- (5, 'sectors_read', True),
- (6, 'rtime', True),
- (7, 'writes_completed', True),
- (9, 'sectors_written', True),
- (10, 'wtime', True),
- (11, 'io_queue', False),
- (13, 'io_time', True)
-]
-
-
@provides("block-io")
-def io_stat(config, disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
- results = {}
- for line in open('/proc/diskstats'):
- vals = line.split()
- dev_name = vals[2]
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
- if not dev_ok or dev_name[-1].isdigit():
- continue
+class BlockIOSensor(ArraysSensor):
+ # 1 - major number
+ # 2 - minor mumber
+ # 3 - device name
+ # 4 - reads completed successfully
+ # 5 - reads merged
+ # 6 - sectors read
+ # 7 - time spent reading (ms)
+ # 8 - writes completed
+ # 9 - writes merged
+ # 10 - sectors written
+ # 11 - time spent writing (ms)
+ # 12 - I/Os currently in progress
+ # 13 - time spent doing I/Os (ms)
+ # 14 - weighted time spent doing I/Os (ms)
- for pos, name, _ in io_values_pos:
- results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
- return results
+ io_values_pos = [
+ (3, 'reads_completed', True),
+ (5, 'sectors_read', True),
+ (6, 'rtime', True),
+ (7, 'writes_completed', True),
+ (9, 'sectors_written', True),
+ (10, 'wtime', True),
+ (11, 'io_queue', False),
+ (13, 'io_time', True)
+ ]
+ def __init__(self, *args, **kwargs):
+ ArraysSensor.__init__(self, *args, **kwargs)
+ if self.disallowed is None:
+ self.disallowed = ('ram', 'loop')
+ self.allowed_devs = set()
-# 1 - major number
-# 2 - minor mumber
-# 3 - device name
-# 4 - reads completed successfully
-# 5 - reads merged
-# 6 - sectors read
-# 7 - time spent reading (ms)
-# 8 - writes completed
-# 9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
+ for line in open('/proc/diskstats'):
+ dev_name = line.split()[2]
+ if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
+ self.accepted_devs.add(dev_name)
-net_values_pos = [
- (0, 'recv_bytes', True),
- (1, 'recv_packets', True),
- (8, 'send_bytes', True),
- (9, 'send_packets', True),
-]
+ def collect(self):
+ for line in open('/proc/diskstats'):
+ vals = line.split()
+ dev_name = vals[2]
+ if dev_name not in self.allowed_devs:
+ continue
+
+ for pos, name, _ in self.io_values_pos:
+ self.add_data(dev_name, name, int(vals[pos]))
@provides("net-io")
-def net_stat(config, disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
- results = {}
+class NetIOSensor(ArraysSensor):
+ # 1 - major number
+ # 2 - minor mumber
+ # 3 - device name
+ # 4 - reads completed successfully
+ # 5 - reads merged
+ # 6 - sectors read
+ # 7 - time spent reading (ms)
+ # 8 - writes completed
+ # 9 - writes merged
+ # 10 - sectors written
+ # 11 - time spent writing (ms)
+ # 12 - I/Os currently in progress
+ # 13 - time spent doing I/Os (ms)
+ # 14 - weighted time spent doing I/Os (ms)
- for line in open('/proc/net/dev').readlines()[2:]:
- dev_name, stats = line.split(":", 1)
- dev_name = dev_name.strip()
- vals = stats.split()
+ net_values_pos = [
+ (0, 'recv_bytes', True),
+ (1, 'recv_packets', True),
+ (8, 'send_bytes', True),
+ (9, 'send_packets', True),
+ ]
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
+ def __init__(self, *args, **kwargs):
+ ArraysSensor.__init__(self, *args, **kwargs)
- if '.' in dev_name and dev_name.split('.')[-1].isdigit():
- dev_ok = False
+ if self.disallowed is None:
+ self.disallowed = ('docker', 'lo')
- if dev_ok:
- for pos, name, _ in net_values_pos:
- results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
- return results
+ if self.allowed is None:
+ self.allowed = ('eth',)
+
+ self.allowed_devs = set()
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name = line.split(":", 1)[0].strip()
+ dev_ok = self.is_dev_accepted(dev_name)
+ if dev_ok and ('.' not in dev_name or not dev_name.split('.')[-1].isdigit()):
+ self.allowed_devs.add(dev_name)
+
+ def collect(self):
+ for line in open('/proc/net/dev').readlines()[2:]:
+ dev_name, stats = line.split(":", 1)
+ dev_name = dev_name.strip()
+ if dev_name in self.allowed_devs:
+ vals = stats.split()
+ for pos, name, _ in self.net_values_pos:
+ self.add_data(dev_name, name, int(vals[pos]))
def pid_stat(pid):
@@ -175,16 +234,15 @@
@provides("perprocess-cpu")
-def pscpu_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- # TODO(koder): fixed list of PID's nust be given
- for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
- try:
- results["{0}.{1}".format(get_pid_name(pid), pid)] = pid_stat(pid)
- except IOError:
- # may be, proc has already terminated, skip it
- continue
- return results
+class ProcCpuSensor(ArraysSensor):
+ def collect(self):
+ # TODO(koder): fixed list of PID's must be given
+ for pid in get_pid_list(self.disallowed, self.allowed):
+ try:
+ self.add_data(get_pid_name(pid), pid, pid_stat(pid))
+ except IOError:
+ # probably proc has already terminated, skip it
+ continue
def get_mem_stats(pid):
@@ -227,139 +285,114 @@
@provides("perprocess-ram")
-def psram_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- # TODO(koder): fixed list of PID's nust be given
- for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
- try:
- dev_name = get_pid_name(pid)
+class ProcRamSensor(ArraysSensor):
+ def collect(self):
+ # TODO(koder): fixed list of PID's nust be given
+ for pid in get_pid_list(self.disallowed, self.allowed):
+ try:
+ dev_name = get_pid_name(pid)
- private, shared = get_mem_stats(pid)
- total = private + shared
- sys_total = get_ram_size()
- usage = float(total) / sys_total
+ private, shared = get_mem_stats(pid)
+ total = private + shared
+ sys_total = get_ram_size()
+ usage = float(total) / sys_total
- sensor_name = "{0}({1})".format(dev_name, pid)
+ sensor_name = "{0}({1})".format(dev_name, pid)
- results.update([
- (sensor_name + ".private_mem", private),
- (sensor_name + ".shared_mem", shared),
- (sensor_name + ".used_mem", total),
- (sensor_name + ".mem_usage_percent", int(usage * 100))])
- except IOError:
- # permission denied or proc die
- continue
- return results
-
-# 0 - cpu name
-# 1 - user: normal processes executing in user mode
-# 2 - nice: niced processes executing in user mode
-# 3 - system: processes executing in kernel mode
-# 4 - idle: twiddling thumbs
-# 5 - iowait: waiting for I/O to complete
-# 6 - irq: servicing interrupts
-# 7 - softirq: servicing softirqs
-
-cpu_values_pos = [
- (1, 'user_processes', True),
- (2, 'nice_processes', True),
- (3, 'system_processes', True),
- (4, 'idle_time', True),
-]
+ self.add_data(sensor_name, "private_mem", private)
+ self.add_data(sensor_name, "shared_mem", shared),
+ self.add_data(sensor_name, "used_mem", total),
+ self.add_data(sensor_name, "mem_usage_percent", int(usage * 100))
+ except IOError:
+ # permission denied or proc die
+ continue
@provides("system-cpu")
-def syscpu_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
+class SystemCPUSensor(ArraysSensor):
+ # 0 - cpu name
+ # 1 - user: normal processes executing in user mode
+ # 2 - nice: niced processes executing in user mode
+ # 3 - system: processes executing in kernel mode
+ # 4 - idle: twiddling thumbs
+ # 5 - iowait: waiting for I/O to complete
+ # 6 - irq: servicing interrupts
+ # 7 - softirq: servicing softirqs
- # calculate core count
- core_count = 0
+ cpu_values_pos = [
+ (1, 'user_processes', True),
+ (2, 'nice_processes', True),
+ (3, 'system_processes', True),
+ (4, 'idle_time', True),
+ ]
- for line in open('/proc/stat'):
- vals = line.split()
- dev_name = vals[0]
+ def collect(self):
+ # calculate core count
+ core_count = 0
- if dev_name == 'cpu':
- for pos, name, _ in cpu_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = int(vals[pos])
- elif dev_name == 'procs_blocked':
- val = int(vals[1])
- results["cpu.procs_blocked"] = val
- elif dev_name.startswith('cpu'):
- core_count += 1
+ for line in open('/proc/stat'):
+ vals = line.split()
+ dev_name = vals[0]
- # procs in queue
- TASKSPOS = 3
- vals = open('/proc/loadavg').read().split()
- ready_procs = vals[TASKSPOS].partition('/')[0]
+ if dev_name == 'cpu':
+ for pos, name, _ in self.cpu_values_pos:
+ self.add_data(dev_name, name, int(vals[pos]))
+ elif dev_name == 'procs_blocked':
+ self.add_data("cpu", "procs_blocked", int(vals[1]))
+ elif dev_name.startswith('cpu'):
+ core_count += 1
- # dec on current proc
- procs_queue = (float(ready_procs) - 1) / core_count
- results["cpu.procs_queue_x10"] = int(procs_queue * 10)
+ # procs in queue
+ TASKSPOS = 3
+ vals = open('/proc/loadavg').read().split()
+ ready_procs = vals[TASKSPOS].partition('/')[0]
- return results
-
-
-# return this values or setted in allowed
-ram_fields = [
- 'MemTotal',
- 'MemFree',
- 'Buffers',
- 'Cached',
- 'SwapCached',
- 'Dirty',
- 'Writeback',
- 'SwapTotal',
- 'SwapFree'
-]
+ # dec on current proc
+ procs_queue = (float(ready_procs) - 1) / core_count
+ self.add_data("cpu", "procs_queue_x10", int(procs_queue * 10))
@provides("system-ram")
-def sysram_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
- if allowed_prefixes is None:
- allowed_prefixes = ram_fields
+class SystemCPUSensor(ArraysSensor):
+ # return this values or setted in allowed
+ ram_fields = ['MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapCached',
+ 'Dirty', 'Writeback', 'SwapTotal', 'SwapFree']
- results = {}
+ def __init__(self, *args, **kwargs):
+ ArraysSensor.__init__(self, *args, **kwargs)
- for line in open('/proc/meminfo'):
- vals = line.split()
- dev_name = vals[0].rstrip(":")
+ if self.allowed is None:
+ self.allowed = self.ram_fields
- dev_ok = is_dev_accepted(dev_name,
- disallowed_prefixes,
- allowed_prefixes)
+ self.allowed_fields = set()
+ for line in open('/proc/meminfo'):
+ field_name = line.split()[0].rstrip(":")
+ if self.is_dev_accepted(field_name):
+ self.allowed_fields.add(field_name)
- title = "ram.{0}".format(dev_name)
-
- if dev_ok:
- results[title] = int(vals[1])
-
- if 'ram.MemFree' in results and 'ram.MemTotal' in results:
- used = results['ram.MemTotal'].value - results['ram.MemFree'].value
- results["ram.usage_percent"] = int(float(used) / results['ram.MemTotal'].value)
-
- return results
+ def collect(self):
+ for line in open('/proc/meminfo'):
+ vals = line.split()
+ field = vals[0].rstrip(":")
+ if field in self.allowed_fields:
+ self.add_data("ram", field, int(vals[1]))
@provides("ceph")
-def ceph_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
+class CephSensor(ArraysSensor):
+ def collect(self):
+ def get_val(dct, path):
+ if '/' in path:
+ root, next = path.split('/', 1)
+ return get_val(dct[root], next)
+ return dct[path]
- def get_val(dct, path):
- if '/' in path:
- root, next = path.split('/', 1)
- return get_val(dct[root], next)
- return dct[path]
-
- for osd_id in config['osds']:
- asok = '/var/run/ceph/{}-osd.{}.asok'.format(config['cluster'], osd_id)
- out = subprocess.check_output('ceph daemon {} perf dump'.format(asok), shell=True)
- data = json.loads(out)
- for key_name in config['counters']:
- results["osd{}.{}".format(osd_id, key_name.replace("/", "."))] = get_val(data, key_name)
-
- return results
+ for osd_id in self.params['osds']:
+ asok = '/var/run/ceph/{}-osd.{}.asok'.format(self.params['cluster'], osd_id)
+ out = subprocess.check_output('ceph daemon {} perf dump'.format(asok), shell=True)
+ data = json.loads(out)
+ for key_name in self.params['counters']:
+ self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
class SensorsData(object):
@@ -367,7 +400,7 @@
self.cond = threading.Condition()
self.collected_at = array.array("f")
self.stop = False
- self.data = {} # {str: array[data]}
+ self.sensors = {}
self.data_fd = None # temporary file to store results
self.exception = None
@@ -392,7 +425,26 @@
try:
next_collect_at = time.time()
+ # prepare sensor classes
+ with sdata.cond:
+ sdata.sensors = {}
+ for name, config in sensors_config.items():
+ params = {'params': config}
+
+ if "allow" in config:
+ params["allowed_prefixes"] = config["allow"]
+
+ if "disallow" in config:
+ params["disallowed_prefixes"] = config["disallow"]
+
+ sdata.sensors[name] = SensorsMap[name](**params)
+
+ import pprint
+ logger.debug("sensors.config = %s", pprint.pformat(sensors_config))
+ logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys()))
+
# TODO: handle exceptions here
+ # main loop
while not sdata.stop:
dtime = next_collect_at - time.time()
if dtime > 0:
@@ -405,23 +457,17 @@
break
ctm = time.time()
- new_data = collect(sensors_config)
- etm = time.time()
+ with sdata.cond:
+ sdata.collected_at.append(ctm)
+ for sensor in sdata.sensors.values():
+ sensor.collect()
+ etm = time.time()
+ sdata.collected_at.append(etm)
if etm - ctm > 0.1:
# TODO(koder): need to signal that something in not really ok with sensor collecting
pass
- # TODO: need to pack data time after time to avoid long operations on next updates request
- with sdata.cond:
- sdata.collected_at.append(ctm)
- for source_name, vals in new_data.items():
- for sensor_name, val in vals.items():
- key = (source_name, sensor_name)
- if key not in sdata.data:
- sdata.data[key] = array.array('L', [val])
- else:
- sdata.data[key].append(val)
except Exception:
logger.exception("In sensor BG thread")
sdata.exception = traceback.format_exc()
@@ -449,6 +495,20 @@
sensors_thread.start()
+def unpack_rpc_updates(res_tuple):
+ data, collected_at_b = res_tuple
+ collected_at = array.array('f')
+ collected_at.frombytes(collected_at_b)
+ yield 'collected_at', collected_at
+
+ # TODO: data is unpacked/repacked here with no reason
+ for sensor_path, packed_data in data.items():
+ sensor_path = sensor_path.decode("utf8")
+ sensor_name, device, metric = sensor_path.split('.', 2)
+ data = SensorsMap[sensor_name].unpack_results(device, metric, packed_data)
+ yield sensor_path, data
+
+
def rpc_get_updates():
if sdata is None:
raise ValueError("No sensor thread running")
@@ -456,13 +516,16 @@
with sdata.cond:
if sdata.exception:
raise Exception(sdata.exception)
- res = sdata.data
+
+ res = {}
+ for sensor_name, sensor in sdata.sensors.items():
+ for (device, metrics), val in sensor.get_updates().items():
+ res["{}.{}.{}".format(sensor_name, device, metrics)] = val
+
collected_at = sdata.collected_at
sdata.collected_at = array.array(sdata.collected_at.typecode)
- sdata.data = {name: array.array(val.typecode) for name, val in sdata.data.items()}
- bres = {key: data.tostring() for key, data in res.items()}
- return bres, collected_at.tostring()
+ return res, collected_at.tostring()
def rpc_stop():
@@ -481,11 +544,9 @@
if sdata.exception:
raise Exception(sdata.exception)
- res = sdata.data
- collected_at = sdata.collected_at
+ res = rpc_get_updates()
sensors_thread = None
sdata = None
- bres = {key: data.tostring() for key, data in res.items()}
- return bres, collected_at.tostring()
+ return res
diff --git a/wally/storage.py b/wally/storage.py
index fc9b174..93a7cdd 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -180,6 +180,16 @@
path = "/".join(path)
return path in self.storage
+ def store_raw(self, val: bytes, *path: str) -> None:
+ if not isinstance(path, str):
+ path = "/".join(path)
+ self.storage[path] = val
+
+ def get_raw(self, *path: str) -> bytes:
+ if not isinstance(path, str):
+ path = "/".join(path)
+ return self.storage[path]
+
def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
return self.storage.list("/".join(path))