add local sensor datastore, make IO tests granular
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 51a4dcb..e5b4347 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,4 +1,5 @@
 import time
+import array
 import Queue
 import logging
 import threading
@@ -11,14 +12,91 @@
                                stop_and_remove_sensors)
 
 
-logger = logging.getLogger("wally")
+logger = logging.getLogger("wally.sensors")
 DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
 
 
-def save_sensors_data(data_q, mon_q, fd):
+class SensorDatastore(object):
+    def __init__(self, stime=None):
+        self.lock = threading.Lock()
+        self.stime = stime
+
+        self.min_size = 60 * 60
+        self.max_size = 60 * 61
+
+        self.data = {
+            'testnodes:io': array.array("B"),
+            'testnodes:cpu': array.array("B"),
+        }
+
+    def get_values(self, name, start, end):
+        assert end >= start
+        if end == start:
+            return []
+
+        with self.lock:
+            curr_arr = self.data[name]
+            if self.stime is None:
+                return []
+
+            sidx = start - self.stime
+            eidx = end - self.stime
+
+            if sidx < 0 and eidx < 0:
+                return [0] * (end - start)
+            elif sidx < 0:
+                return [0] * (-sidx) + curr_arr[:eidx]
+            return curr_arr[sidx:eidx]
+
+    def set_values(self, start_time, vals):
+        with self.lock:
+            return self.add_values_l(start_time, vals)
+
+    def set_values_l(self, start_time, vals):
+        max_cut = 0
+        for name, values in vals.items():
+            curr_arr = self.data.setdefault(name, array.array("H"))
+
+            if self.stime is None:
+                self.stime = start_time
+
+            curr_end_time = len(curr_arr) + self.stime
+
+            if curr_end_time < start_time:
+                curr_arr.extend([0] * (start_time - curr_end_time))
+                curr_arr.extend(values)
+            elif curr_end_time > start_time:
+                logger.warning("Duplicated sensors data")
+                sindex = len(curr_arr) + (start_time - curr_end_time)
+
+                if sindex < 0:
+                    values = values[-sindex:]
+                    sindex = 0
+                    logger.warning("Some data with timestamp before"
+                                   " beginning of measurememts. Skip them")
+
+                curr_arr[sindex:sindex + len(values)] = values
+            else:
+                curr_arr.extend(values)
+
+            if len(curr_arr) > self.max_size:
+                max_cut = max(len(curr_arr) - self.min_size, max_cut)
+
+        if max_cut > 0:
+            self.start_time += max_cut
+            for val in vals.values():
+                del val[:max_cut]
+
+
+def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
     fd.write("\n")
 
+    BUFFER = 3
     observed_nodes = set()
+    testnodes_data = {
+        'io': {},
+        'cpu': {},
+    }
 
     try:
         while True:
@@ -32,8 +110,27 @@
                 mon_q.put(addr + (data['source_id'],))
                 observed_nodes.add(addr)
 
-            fd.write("{0!s} : {1!r}\n".format(time.time(),
-                     repr((addr, data))))
+            fd.write(repr((addr, data)) + "\n")
+
+            source_id = data.pop('source_id')
+            rep_time = data.pop('time')
+            if 'testnode' in source2roles_map.get(source_id, []):
+                vl = testnodes_data['io'].get(rep_time, 0)
+                sum_io_q = vl
+                testnodes_data['io'][rep_time] = sum_io_q
+
+            etime = time.time() - BUFFER
+            for name, vals in testnodes_data.items():
+                new_vals = {}
+                for rtime, value in vals.items():
+                    if rtime < etime:
+                        data_store.set_values("testnodes:io", rtime, [value])
+                    else:
+                        new_vals[rtime] = value
+
+                vals.clear()
+                vals.update(new_vals)
+
     except Exception:
         logger.exception("Error in sensors thread")
     logger.info("Sensors thread exits")
@@ -42,6 +139,7 @@
 def get_sensors_config_for_nodes(cfg, nodes):
     monitored_nodes = []
     sensors_configs = []
+    source2roles_map = {}
 
     receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
     assert '{ip}' in receiver_url
@@ -64,6 +162,7 @@
                         ext_ip = utils.get_ip_for_target(ip)
                     monitor_url = receiver_url.format(ip=ext_ip)
 
+                source2roles_map[node.get_conn_id()] = node.roles
                 monitored_nodes.append(node)
                 sens_cfg = SensorConfig(node.connection,
                                         node.get_conn_id(),
@@ -72,18 +171,20 @@
                                         monitor_url=monitor_url)
                 sensors_configs.append(sens_cfg)
 
-    return monitored_nodes, sensors_configs
+    return monitored_nodes, sensors_configs, source2roles_map
 
 
-def start_sensor_process_thread(ctx, cfg, sensors_configs):
+def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
     receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
     sensors_data_q, stop_sensors_loop = \
         start_listener_thread(receiver_url.format(ip='0.0.0.0'))
 
     mon_q = Queue.Queue()
     fd = open(cfg_dict['sensor_storage'], "w")
+
+    params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
     sensor_listen_th = threading.Thread(None, save_sensors_data, None,
-                                        (sensors_data_q, mon_q, fd))
+                                        params)
     sensor_listen_th.daemon = True
     sensor_listen_th.start()
 
@@ -105,8 +206,8 @@
     if nodes is None:
         nodes = ctx.nodes
 
-    monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
-                                                                    nodes)
+    monitored_nodes, sensors_configs, source2roles_map = \
+        get_sensors_config_for_nodes(cfg, nodes)
 
     if len(monitored_nodes) == 0:
         logger.info("Nothing to monitor, no sensors would be installed")
@@ -115,12 +216,13 @@
     if ctx.sensors_mon_q is None:
         logger.info("Start sensors data receiving thread")
         ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
-                                                        sensors_configs)
+                                                        sensors_configs,
+                                                        source2roles_map)
 
     if undeploy:
         def remove_sensors_stage(cfg, ctx):
-            _, sensors_configs = get_sensors_config_for_nodes(cfg['sensors'],
-                                                              nodes)
+            _, sensors_configs, _ = \
+                get_sensors_config_for_nodes(cfg['sensors'], nodes)
             stop_and_remove_sensors(sensors_configs)
 
         ctx.clear_calls_stack.append(remove_sensors_stage)