moving code to cephlib
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index f2c9488..3672458 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,13 +1,13 @@
 import os
 import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
 
 import numpy
 
 from .suits.job import JobConfig
 from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
-from .storage import Storage, csv_file_encoding
-from .utils import StopTestError, str2shape, shape2str
+from .storage import Storage
+from .utils import StopTestError
 from .suits.all_suits import all_suits
 
 
@@ -41,7 +41,7 @@
     sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
 
     report_root = 'report/'
-    plot_r = r'report/{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
+    plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
 
     job_cfg = job_cfg_r.replace("\\.", '.')
     suite_cfg = suite_cfg_r.replace("\\.", '.')
@@ -70,39 +70,92 @@
     ts_header_size = 64
     ts_header_format = "!IIIcc"
     ts_arr_tag = 'csv'
+    csv_file_encoding = 'ascii'
 
     def __init__(self, storage: Storage) -> None:
         self.storage = storage
+        self.cache = {}  # type: Dict[str, Tuple[int, int, Any, List[str]]]
 
     def sync(self) -> None:
         self.storage.sync()
 
     #  -----------------  SERIALIZATION / DESERIALIZATION  -------------------------------------------------------------
+    def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
+        with self.storage.get_fd(path, "rb") as fd:
+            stats = os.fstat(fd.fileno())
+            if path in self.cache:
+                size, atime, obj, header = self.cache[path]
+                if size == stats.st_size and atime == stats.st_atime_ns:
+                    return obj, header
+
+            header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
+            print("header =", header)
+            if skip_shape:
+                header = header[1:]
+            dt = fd.read().decode("utf-8").strip()
+            print(dt.split("\n")[0])
+            arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
+            if len(dt) != 0:
+                lines = dt.count("\n") + 1
+                columns = dt.split("\n", 1)[0].count(",") + 1
+                assert lines * columns == len(arr)
+                if columns == 1:
+                    arr.shape = (lines,)
+                else:
+                    arr.shape = (lines, columns)
+
+        self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
+        return arr, header[1:]
+
+    def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
+        header = [data.dtype.name] + header
+
+        exists = append_on_exists and path in self.storage
+        if len(data.shape) == 1:
+            # make array vertical to simplify reading
+            vw = data.view().reshape((data.shape[0], 1))
+        else:
+            vw = data
+
+        with self.storage.get_fd(path, "cb" if exists else "wb") as fd:
+            if exists:
+                curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
+                assert header == curr_header, \
+                    "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
+                        .format(path, header, curr_header)
+                fd.seek(0, os.SEEK_END)
+            else:
+                fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
+
+            numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
 
     def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
+        arr, header = self.load_array(path, skip_shape=True)
+        units, time_units = header
 
-        with self.storage.get_fd(path, "rb") as fd:
-            header = fd.readline().decode(csv_file_encoding).strip().split(",")
-            shape, dtype, units, time_units = header
-            arr = numpy.loadtxt(fd, delimiter=',', dtype=dtype)
+        data = arr[:,1:]
+        if data.shape[1] == 1:
+            data = data.reshape((-1,))
 
         return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
                           raw=None,
-                          data=arr[:,1:].reshape(str2shape(shape)),
+                          data=data,
                           times=arr[:,0],
                           source=ds,
                           units=units,
                           time_units=time_units)
 
     def load_sensor(self, ds: DataSource) -> TimeSeries:
-        collect_header, collected_at = self.storage.get_array(DB_paths.sensor_time.format(**ds.__dict__))
+        collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
         assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
-
-        data_header, data = self.storage.get_array(DB_paths.sensor_data.format(**ds.__dict__))
+        data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
 
         data_units = data_header[2]
         assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
 
+        assert len(data.shape) == 1
+        assert len(collected_at.shape) == 1
+
         return TimeSeries(ds.metric_fqdn,
                           raw=None,
                           data=data,
@@ -115,7 +168,7 @@
 
     def check_plot_file(self, source: DataSource) -> Optional[str]:
         path = DB_paths.plot.format(**source.__dict__)
-        fpath = self.storage.resolve_raw(path)
+        fpath = self.storage.resolve_raw(DB_paths.report_root + path)
         return path if os.path.exists(fpath) else None
 
     # -------------   PUT DATA INTO STORAGE   --------------------------------------------------------------------------
@@ -140,22 +193,16 @@
         assert ts.source.tag == self.ts_arr_tag
 
         csv_path = DB_paths.ts.format(**ts.source.__dict__)
-        header = [shape2str(ts.data.shape),
-                  ts.data.dtype.name,
-                  ts.units,
-                  ts.time_units]
+        header = [ts.data.dtype.name, ts.units, ts.time_units]
 
-        with self.storage.get_fd(csv_path, "cb") as fd:
-            tv = ts.times.view().reshape((-1, 1))
+        tv = ts.times.view().reshape((-1, 1))
+        if len(ts.data.shape) == 1:
+            dv = ts.data.view().reshape((ts.times.shape[0], -1))
+        else:
+            dv = ts.data
 
-            if len(ts.data.shape) == 1:
-                dv = ts.data.view().reshape((ts.times.shape[0], -1))
-            else:
-                dv = ts.data
-
-            result = numpy.concatenate((tv, dv), axis=1)
-            fd.write((",".join(map(str, header)) + "\n").encode(csv_file_encoding))
-            numpy.savetxt(fd, result, delimiter=',', newline="\n", fmt="%lu")
+        result = numpy.concatenate((tv, dv), axis=1)
+        self.put_array(csv_path, result, header)
 
         if ts.raw:
             raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
@@ -170,10 +217,11 @@
     # return path to file to be inserted into report
     def put_plot_file(self, data: bytes, source: DataSource) -> str:
         path = DB_paths.plot.format(**source.__dict__)
-        return cast(str, self.storage.put_raw(data, path))
+        self.storage.put_raw(data, DB_paths.report_root + path)
+        return path
 
     def put_report(self, report: str, name: str) -> str:
-        return self.storage.put_raw(report.encode("utf8"), DB_paths.report_root + name)
+        return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
 
     def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
         if ds.metric == 'collected_at':
@@ -182,7 +230,7 @@
         else:
             path = DB_paths.sensor_data
             metrics_fqn = ds.metric_fqdn
-        self.storage.append([ds.node_id, metrics_fqn, units], data, path.format(**ds.__dict__))
+        self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
 
     # -------------   GET DATA FROM STORAGE   --------------------------------------------------------------------------
 
@@ -217,7 +265,6 @@
     def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
         filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
         ts_glob = fill_path(DB_paths.ts_r, **filters)
-
         for is_file, path, groups in self.iter_paths(ts_glob):
             assert is_file
             groups = groups.copy()