sensors refactored
diff --git a/wally/ceph.py b/wally/ceph.py
index cbb4740..592d595 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -28,7 +28,11 @@
     """Get set of osd's ip"""
 
     data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
-    jdata = json.loads(data)
+    try:
+        jdata = json.loads(data)
+    except:
+        open("/tmp/ceph-out.json", "w").write(data)
+        raise
     ips = {}  # type: Dict[IP, List[OSDInfo]]
     first_error = True
     for osd_data in jdata["osds"]:
@@ -56,6 +60,7 @@
                     osd_data_path = line.split("=")[1].strip()
 
             if osd_data_path is None or osd_journal_path is None:
+                open("/tmp/ceph-out.json", "w").write(osd_cfg)
                 logger.error("Can't detect osd %s journal or storage path", osd_id)
                 raise StopTestError()
 
diff --git a/wally/node.py b/wally/node.py
index 4d2dda8..9f4250b 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -175,11 +175,13 @@
 
         cmd_b = cmd.encode("utf8")
         proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
-        code = None
         out = ""
-        while code is None:
+
+        while True:
             code, outb, _ = self.conn.cli.get_updates(proc_id)
             out += outb.decode("utf8")
+            if code is not None:
+                break
             time.sleep(0.01)
 
         if code != 0:
diff --git a/wally/run_test.py b/wally/run_test.py
index 7baac35..c72e456 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -84,7 +84,7 @@
                     path = "rpc_logs/" + nid
                     node.conn.server.flush_logs()
                     log = node.get_file_content(node.rpc_log_file)
-                    ctx.storage[path] = log.decode("utf8")
+                    ctx.storage.store_raw(log, path)
                     logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
 
         with ctx.get_pool() as pool:
diff --git a/wally/sensors.py b/wally/sensors.py
index 4f00a3e..eef7864 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,5 +1,6 @@
 import array
 import logging
+import collections
 from typing import List, Dict, Tuple
 
 from . import utils
@@ -78,17 +79,9 @@
             else:
                 func = node.conn.sensors.get_updates
 
-            data, collected_at_b = func()  # type: Dict[Tuple[bytes, bytes], List[int]], List[float]
-
-            collected_at = array.array('f')
-            collected_at.frombytes(collected_at_b)
-
-            mstore = ctx.storage.sub_storage("metric", node_id)
-            for (source_name, sensor_name), values_b in data.items():
-                values = array.array('Q')
-                values.frombytes(values_b)
-                mstore.append(values, source_name, sensor_name.decode("utf8"))
-                mstore.append(collected_at, "collected_at")
+            # TODO: data is unpacked/repacked here with no reason
+            for path, value in sensors_rpc_plugin.unpack_rpc_updates(func()):
+                ctx.storage.append(value, "metric", node_id, path)
 
 
 class CollectSensorsStage(Stage):
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index 71a24ac..1c508ee 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,4 +1,5 @@
 import os
+import sys
 import json
 import time
 import array
@@ -6,6 +7,7 @@
 import threading
 import traceback
 import subprocess
+import collections
 
 
 mod_name = "sensors"
@@ -16,35 +18,81 @@
 SensorsMap = {}
 
 
+class Sensor(object):
+    def __init__(self, params, allowed=None, disallowed=None):
+        self.params = params
+        self.allowed = allowed
+        self.disallowed = disallowed
+        self.allowed_names = set()
+
+    def add_data(self, device, name, value):
+        pass
+
+    def collect(self):
+        pass
+
+    def get_updates(self):
+        pass
+
+    @classmethod
+    def unpack_results(cls, device, metrics, data):
+        pass
+
+
+class ArraysSensor(Sensor):
+    typecode = 'L'
+
+    def __init__(self, params, allowed=None, disallowed=None):
+        Sensor.__init__(self, params, allowed, disallowed)
+        self.data = collections.defaultdict(lambda: array.array(self.typecode))
+
+    def add_data(self, device, name, value):
+        self.data[(device, name)].append(value)
+
+    def get_updates(self):
+        res = self.data
+        self.data = collections.defaultdict(lambda: array.array(self.typecode))
+        packed = {
+            name: arr.typecode + arr.tostring()
+            for name, arr in res.items()
+        }
+        return packed
+
+    @classmethod
+    def unpack_results(cls, device, metrics, packed):
+        arr = array.array(chr(packed[0]))
+        if sys.version_info >= (3, 0, 0):
+            arr.frombytes(packed[1:])
+        else:
+            arr.fromstring(packed[1:])
+        return arr
+
+    def is_dev_accepted(self, name):
+        dev_ok = True
+
+        if self.disallowed is not None:
+            dev_ok = all(not name.startswith(prefix) for prefix in self.disallowed)
+
+        if dev_ok and self.allowed is not None:
+            dev_ok = any(name.startswith(prefix) for prefix in self.allowed)
+
+        return dev_ok
+
+
 def provides(name):
-    def closure(func):
-        SensorsMap[name] = func
-        return func
+    def closure(cls):
+        SensorsMap[name] = cls
+        return cls
     return closure
 
 
-def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
-    dev_ok = True
-
-    if disallowed_prefixes is not None:
-        dev_ok = all(not name.startswith(prefix)
-                     for prefix in disallowed_prefixes)
-
-    if dev_ok and allowed_prefixes is not None:
-        dev_ok = any(name.startswith(prefix)
-                     for prefix in allowed_prefixes)
-
-    return dev_ok
-
-
 def get_pid_list(disallowed_prefixes, allowed_prefixes):
     """Return pid list from list of pids and names"""
     # exceptions
     disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
     if allowed_prefixes is None:
         # if nothing setted - all ps will be returned except setted
-        result = [pid
-                  for pid in os.listdir('/proc')
+        result = [pid for pid in os.listdir('/proc')
                   if pid.isdigit() and pid not in disallowed]
     else:
         result = []
@@ -72,93 +120,104 @@
         return "no_such_process"
 
 
-#  1 - major number
-#  2 - minor mumber
-#  3 - device name
-#  4 - reads completed successfully
-#  5 - reads merged
-#  6 - sectors read
-#  7 - time spent reading (ms)
-#  8 - writes completed
-#  9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
-
-io_values_pos = [
-    (3, 'reads_completed', True),
-    (5, 'sectors_read', True),
-    (6, 'rtime', True),
-    (7, 'writes_completed', True),
-    (9, 'sectors_written', True),
-    (10, 'wtime', True),
-    (11, 'io_queue', False),
-    (13, 'io_time', True)
-]
-
-
 @provides("block-io")
-def io_stat(config, disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
-    results = {}
-    for line in open('/proc/diskstats'):
-        vals = line.split()
-        dev_name = vals[2]
-        dev_ok = is_dev_accepted(dev_name,
-                                 disallowed_prefixes,
-                                 allowed_prefixes)
-        if not dev_ok or dev_name[-1].isdigit():
-            continue
+class BlockIOSensor(ArraysSensor):
+    #  1 - major number
+    #  2 - minor mumber
+    #  3 - device name
+    #  4 - reads completed successfully
+    #  5 - reads merged
+    #  6 - sectors read
+    #  7 - time spent reading (ms)
+    #  8 - writes completed
+    #  9 - writes merged
+    # 10 - sectors written
+    # 11 - time spent writing (ms)
+    # 12 - I/Os currently in progress
+    # 13 - time spent doing I/Os (ms)
+    # 14 - weighted time spent doing I/Os (ms)
 
-        for pos, name, _ in io_values_pos:
-            results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
-    return results
+    io_values_pos = [
+        (3, 'reads_completed', True),
+        (5, 'sectors_read', True),
+        (6, 'rtime', True),
+        (7, 'writes_completed', True),
+        (9, 'sectors_written', True),
+        (10, 'wtime', True),
+        (11, 'io_queue', False),
+        (13, 'io_time', True)
+    ]
 
+    def __init__(self, *args, **kwargs):
+        ArraysSensor.__init__(self, *args, **kwargs)
+        if self.disallowed is None:
+            self.disallowed = ('ram', 'loop')
+        self.allowed_devs = set()
 
-#  1 - major number
-#  2 - minor mumber
-#  3 - device name
-#  4 - reads completed successfully
-#  5 - reads merged
-#  6 - sectors read
-#  7 - time spent reading (ms)
-#  8 - writes completed
-#  9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
+        for line in open('/proc/diskstats'):
+            dev_name = line.split()[2]
+            if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
+                self.accepted_devs.add(dev_name)
 
-net_values_pos = [
-    (0, 'recv_bytes', True),
-    (1, 'recv_packets', True),
-    (8, 'send_bytes', True),
-    (9, 'send_packets', True),
-]
+    def collect(self):
+        for line in open('/proc/diskstats'):
+            vals = line.split()
+            dev_name = vals[2]
+            if dev_name not in self.allowed_devs:
+                continue
+
+            for pos, name, _ in self.io_values_pos:
+                self.add_data(dev_name, name, int(vals[pos]))
 
 
 @provides("net-io")
-def net_stat(config, disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
-    results = {}
+class NetIOSensor(ArraysSensor):
+    #  1 - major number
+    #  2 - minor mumber
+    #  3 - device name
+    #  4 - reads completed successfully
+    #  5 - reads merged
+    #  6 - sectors read
+    #  7 - time spent reading (ms)
+    #  8 - writes completed
+    #  9 - writes merged
+    # 10 - sectors written
+    # 11 - time spent writing (ms)
+    # 12 - I/Os currently in progress
+    # 13 - time spent doing I/Os (ms)
+    # 14 - weighted time spent doing I/Os (ms)
 
-    for line in open('/proc/net/dev').readlines()[2:]:
-        dev_name, stats = line.split(":", 1)
-        dev_name = dev_name.strip()
-        vals = stats.split()
+    net_values_pos = [
+        (0, 'recv_bytes', True),
+        (1, 'recv_packets', True),
+        (8, 'send_bytes', True),
+        (9, 'send_packets', True),
+    ]
 
-        dev_ok = is_dev_accepted(dev_name,
-                                 disallowed_prefixes,
-                                 allowed_prefixes)
+    def __init__(self, *args, **kwargs):
+        ArraysSensor.__init__(self, *args, **kwargs)
 
-        if '.' in dev_name and dev_name.split('.')[-1].isdigit():
-            dev_ok = False
+        if self.disallowed is None:
+            self.disallowed = ('docker', 'lo')
 
-        if dev_ok:
-            for pos, name, _ in net_values_pos:
-                results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
-    return results
+        if self.allowed is None:
+            self.allowed = ('eth',)
+
+        self.allowed_devs = set()
+        for line in open('/proc/net/dev').readlines()[2:]:
+            dev_name = line.split(":", 1)[0].strip()
+            dev_ok = self.is_dev_accepted(dev_name)
+            if dev_ok and ('.' not in dev_name or not dev_name.split('.')[-1].isdigit()):
+                self.allowed_devs.add(dev_name)
+
+    def collect(self):
+        for line in open('/proc/net/dev').readlines()[2:]:
+            dev_name, stats = line.split(":", 1)
+            dev_name = dev_name.strip()
+            if dev_name in self.allowed_devs:
+                vals = stats.split()
+                for pos, name, _ in self.net_values_pos:
+                    self.add_data(dev_name, name, int(vals[pos]))
 
 
 def pid_stat(pid):
@@ -175,16 +234,15 @@
 
 
 @provides("perprocess-cpu")
-def pscpu_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
-    # TODO(koder): fixed list of PID's nust be given
-    for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
-        try:
-            results["{0}.{1}".format(get_pid_name(pid), pid)] = pid_stat(pid)
-        except IOError:
-            # may be, proc has already terminated, skip it
-            continue
-    return results
+class ProcCpuSensor(ArraysSensor):
+    def collect(self):
+        # TODO(koder): fixed list of PID's must be given
+        for pid in get_pid_list(self.disallowed, self.allowed):
+            try:
+                self.add_data(get_pid_name(pid), pid, pid_stat(pid))
+            except IOError:
+                # probably proc has already terminated, skip it
+                continue
 
 
 def get_mem_stats(pid):
@@ -227,139 +285,114 @@
 
 
 @provides("perprocess-ram")
-def psram_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
-    # TODO(koder): fixed list of PID's nust be given
-    for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
-        try:
-            dev_name = get_pid_name(pid)
+class ProcRamSensor(ArraysSensor):
+    def collect(self):
+        # TODO(koder): fixed list of PID's nust be given
+        for pid in get_pid_list(self.disallowed, self.allowed):
+            try:
+                dev_name = get_pid_name(pid)
 
-            private, shared = get_mem_stats(pid)
-            total = private + shared
-            sys_total = get_ram_size()
-            usage = float(total) / sys_total
+                private, shared = get_mem_stats(pid)
+                total = private + shared
+                sys_total = get_ram_size()
+                usage = float(total) / sys_total
 
-            sensor_name = "{0}({1})".format(dev_name, pid)
+                sensor_name = "{0}({1})".format(dev_name, pid)
 
-            results.update([
-                (sensor_name + ".private_mem", private),
-                (sensor_name + ".shared_mem", shared),
-                (sensor_name + ".used_mem", total),
-                (sensor_name + ".mem_usage_percent", int(usage * 100))])
-        except IOError:
-            # permission denied or proc die
-            continue
-    return results
-
-# 0 - cpu name
-# 1 - user: normal processes executing in user mode
-# 2 - nice: niced processes executing in user mode
-# 3 - system: processes executing in kernel mode
-# 4 - idle: twiddling thumbs
-# 5 - iowait: waiting for I/O to complete
-# 6 - irq: servicing interrupts
-# 7 - softirq: servicing softirqs
-
-cpu_values_pos = [
-    (1, 'user_processes', True),
-    (2, 'nice_processes', True),
-    (3, 'system_processes', True),
-    (4, 'idle_time', True),
-]
+                self.add_data(sensor_name, "private_mem", private)
+                self.add_data(sensor_name, "shared_mem", shared),
+                self.add_data(sensor_name, "used_mem", total),
+                self.add_data(sensor_name, "mem_usage_percent", int(usage * 100))
+            except IOError:
+                # permission denied or proc die
+                continue
 
 
 @provides("system-cpu")
-def syscpu_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
+class SystemCPUSensor(ArraysSensor):
+    # 0 - cpu name
+    # 1 - user: normal processes executing in user mode
+    # 2 - nice: niced processes executing in user mode
+    # 3 - system: processes executing in kernel mode
+    # 4 - idle: twiddling thumbs
+    # 5 - iowait: waiting for I/O to complete
+    # 6 - irq: servicing interrupts
+    # 7 - softirq: servicing softirqs
 
-    # calculate core count
-    core_count = 0
+    cpu_values_pos = [
+        (1, 'user_processes', True),
+        (2, 'nice_processes', True),
+        (3, 'system_processes', True),
+        (4, 'idle_time', True),
+    ]
 
-    for line in open('/proc/stat'):
-        vals = line.split()
-        dev_name = vals[0]
+    def collect(self):
+        # calculate core count
+        core_count = 0
 
-        if dev_name == 'cpu':
-            for pos, name, _ in cpu_values_pos:
-                sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = int(vals[pos])
-        elif dev_name == 'procs_blocked':
-            val = int(vals[1])
-            results["cpu.procs_blocked"] = val
-        elif dev_name.startswith('cpu'):
-            core_count += 1
+        for line in open('/proc/stat'):
+            vals = line.split()
+            dev_name = vals[0]
 
-    # procs in queue
-    TASKSPOS = 3
-    vals = open('/proc/loadavg').read().split()
-    ready_procs = vals[TASKSPOS].partition('/')[0]
+            if dev_name == 'cpu':
+                for pos, name, _ in self.cpu_values_pos:
+                    self.add_data(dev_name, name, int(vals[pos]))
+            elif dev_name == 'procs_blocked':
+                self.add_data("cpu", "procs_blocked", int(vals[1]))
+            elif dev_name.startswith('cpu'):
+                core_count += 1
 
-    # dec on current proc
-    procs_queue = (float(ready_procs) - 1) / core_count
-    results["cpu.procs_queue_x10"] = int(procs_queue * 10)
+        # procs in queue
+        TASKSPOS = 3
+        vals = open('/proc/loadavg').read().split()
+        ready_procs = vals[TASKSPOS].partition('/')[0]
 
-    return results
-
-
-# return this values or setted in allowed
-ram_fields = [
-    'MemTotal',
-    'MemFree',
-    'Buffers',
-    'Cached',
-    'SwapCached',
-    'Dirty',
-    'Writeback',
-    'SwapTotal',
-    'SwapFree'
-]
+        # dec on current proc
+        procs_queue = (float(ready_procs) - 1) / core_count
+        self.add_data("cpu", "procs_queue_x10", int(procs_queue * 10))
 
 
 @provides("system-ram")
-def sysram_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
-    if allowed_prefixes is None:
-        allowed_prefixes = ram_fields
+class SystemCPUSensor(ArraysSensor):
+    # return this values or setted in allowed
+    ram_fields = ['MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapCached',
+                  'Dirty', 'Writeback', 'SwapTotal', 'SwapFree']
 
-    results = {}
+    def __init__(self, *args, **kwargs):
+        ArraysSensor.__init__(self, *args, **kwargs)
 
-    for line in open('/proc/meminfo'):
-        vals = line.split()
-        dev_name = vals[0].rstrip(":")
+        if self.allowed is None:
+            self.allowed = self.ram_fields
 
-        dev_ok = is_dev_accepted(dev_name,
-                                 disallowed_prefixes,
-                                 allowed_prefixes)
+        self.allowed_fields = set()
+        for line in open('/proc/meminfo'):
+            field_name = line.split()[0].rstrip(":")
+            if self.is_dev_accepted(field_name):
+                self.allowed_fields.add(field_name)
 
-        title = "ram.{0}".format(dev_name)
-
-        if dev_ok:
-            results[title] = int(vals[1])
-
-    if 'ram.MemFree' in results and 'ram.MemTotal' in results:
-        used = results['ram.MemTotal'].value - results['ram.MemFree'].value
-        results["ram.usage_percent"] = int(float(used) / results['ram.MemTotal'].value)
-
-    return results
+    def collect(self):
+        for line in open('/proc/meminfo'):
+            vals = line.split()
+            field = vals[0].rstrip(":")
+            if field in self.allowed_fields:
+                self.add_data("ram", field, int(vals[1]))
 
 
 @provides("ceph")
-def ceph_stat(config, disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
+class CephSensor(ArraysSensor):
+    def collect(self):
+        def get_val(dct, path):
+            if '/' in path:
+                root, next = path.split('/', 1)
+                return get_val(dct[root], next)
+            return dct[path]
 
-    def get_val(dct, path):
-        if '/' in path:
-            root, next = path.split('/', 1)
-            return get_val(dct[root], next)
-        return dct[path]
-
-    for osd_id in config['osds']:
-        asok = '/var/run/ceph/{}-osd.{}.asok'.format(config['cluster'], osd_id)
-        out = subprocess.check_output('ceph daemon {} perf dump'.format(asok), shell=True)
-        data = json.loads(out)
-        for key_name in config['counters']:
-            results["osd{}.{}".format(osd_id, key_name.replace("/", "."))] = get_val(data, key_name)
-
-    return results
+        for osd_id in self.params['osds']:
+            asok = '/var/run/ceph/{}-osd.{}.asok'.format(self.params['cluster'], osd_id)
+            out = subprocess.check_output('ceph daemon {} perf dump'.format(asok), shell=True)
+            data = json.loads(out)
+            for key_name in self.params['counters']:
+                self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
 
 
 class SensorsData(object):
@@ -367,7 +400,7 @@
         self.cond = threading.Condition()
         self.collected_at = array.array("f")
         self.stop = False
-        self.data = {}  # {str: array[data]}
+        self.sensors = {}
         self.data_fd = None  # temporary file to store results
         self.exception = None
 
@@ -392,7 +425,26 @@
     try:
         next_collect_at = time.time()
 
+        # prepare sensor classes
+        with sdata.cond:
+            sdata.sensors = {}
+            for name, config in sensors_config.items():
+                params = {'params': config}
+
+                if "allow" in config:
+                    params["allowed_prefixes"] = config["allow"]
+
+                if "disallow" in config:
+                    params["disallowed_prefixes"] = config["disallow"]
+
+                sdata.sensors[name] = SensorsMap[name](**params)
+
+            import pprint
+            logger.debug("sensors.config = %s", pprint.pformat(sensors_config))
+            logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys()))
+
         # TODO: handle exceptions here
+        # main loop
         while not sdata.stop:
             dtime = next_collect_at - time.time()
             if dtime > 0:
@@ -405,23 +457,17 @@
                 break
 
             ctm = time.time()
-            new_data = collect(sensors_config)
-            etm = time.time()
+            with sdata.cond:
+                sdata.collected_at.append(ctm)
+                for sensor in sdata.sensors.values():
+                    sensor.collect()
+                etm = time.time()
+                sdata.collected_at.append(etm)
 
             if etm - ctm > 0.1:
                 # TODO(koder): need to signal that something in not really ok with sensor collecting
                 pass
 
-            # TODO: need to pack data time after time to avoid long operations on next updates request
-            with sdata.cond:
-                sdata.collected_at.append(ctm)
-                for source_name, vals in new_data.items():
-                    for sensor_name, val in vals.items():
-                        key = (source_name, sensor_name)
-                        if key not in sdata.data:
-                            sdata.data[key] = array.array('L', [val])
-                        else:
-                            sdata.data[key].append(val)
     except Exception:
         logger.exception("In sensor BG thread")
         sdata.exception = traceback.format_exc()
@@ -449,6 +495,20 @@
     sensors_thread.start()
 
 
+def unpack_rpc_updates(res_tuple):
+    data, collected_at_b = res_tuple
+    collected_at = array.array('f')
+    collected_at.frombytes(collected_at_b)
+    yield 'collected_at', collected_at
+
+    # TODO: data is unpacked/repacked here with no reason
+    for sensor_path, packed_data in data.items():
+        sensor_path = sensor_path.decode("utf8")
+        sensor_name, device, metric = sensor_path.split('.', 2)
+        data = SensorsMap[sensor_name].unpack_results(device, metric, packed_data)
+        yield sensor_path, data
+
+
 def rpc_get_updates():
     if sdata is None:
         raise ValueError("No sensor thread running")
@@ -456,13 +516,16 @@
     with sdata.cond:
         if sdata.exception:
             raise Exception(sdata.exception)
-        res = sdata.data
+
+        res = {}
+        for sensor_name, sensor in sdata.sensors.items():
+            for (device, metrics), val in sensor.get_updates().items():
+                res["{}.{}.{}".format(sensor_name, device, metrics)] = val
+
         collected_at = sdata.collected_at
         sdata.collected_at = array.array(sdata.collected_at.typecode)
-        sdata.data = {name: array.array(val.typecode) for name, val in sdata.data.items()}
 
-    bres = {key: data.tostring() for key, data in res.items()}
-    return bres, collected_at.tostring()
+    return res, collected_at.tostring()
 
 
 def rpc_stop():
@@ -481,11 +544,9 @@
     if sdata.exception:
         raise Exception(sdata.exception)
 
-    res = sdata.data
-    collected_at = sdata.collected_at
+    res = rpc_get_updates()
 
     sensors_thread = None
     sdata = None
 
-    bres = {key: data.tostring() for key, data in res.items()}
-    return bres, collected_at.tostring()
+    return res
diff --git a/wally/storage.py b/wally/storage.py
index fc9b174..93a7cdd 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -180,6 +180,16 @@
             path = "/".join(path)
         return path in self.storage
 
+    def store_raw(self, val: bytes, *path: str) -> None:
+        if not isinstance(path, str):
+            path = "/".join(path)
+        self.storage[path] = val
+
+    def get_raw(self, *path: str) -> bytes:
+        if not isinstance(path, str):
+            path = "/".join(path)
+        return self.storage[path]
+
     def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
         return self.storage.list("/".join(path))