a lot of fixes
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index e5b4347..1e9e898 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,5 +1,4 @@
import time
-import array
import Queue
import logging
import threading
@@ -11,92 +10,14 @@
SensorConfig,
stop_and_remove_sensors)
-
logger = logging.getLogger("wally.sensors")
DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
-class SensorDatastore(object):
- def __init__(self, stime=None):
- self.lock = threading.Lock()
- self.stime = stime
-
- self.min_size = 60 * 60
- self.max_size = 60 * 61
-
- self.data = {
- 'testnodes:io': array.array("B"),
- 'testnodes:cpu': array.array("B"),
- }
-
- def get_values(self, name, start, end):
- assert end >= start
- if end == start:
- return []
-
- with self.lock:
- curr_arr = self.data[name]
- if self.stime is None:
- return []
-
- sidx = start - self.stime
- eidx = end - self.stime
-
- if sidx < 0 and eidx < 0:
- return [0] * (end - start)
- elif sidx < 0:
- return [0] * (-sidx) + curr_arr[:eidx]
- return curr_arr[sidx:eidx]
-
- def set_values(self, start_time, vals):
- with self.lock:
- return self.add_values_l(start_time, vals)
-
- def set_values_l(self, start_time, vals):
- max_cut = 0
- for name, values in vals.items():
- curr_arr = self.data.setdefault(name, array.array("H"))
-
- if self.stime is None:
- self.stime = start_time
-
- curr_end_time = len(curr_arr) + self.stime
-
- if curr_end_time < start_time:
- curr_arr.extend([0] * (start_time - curr_end_time))
- curr_arr.extend(values)
- elif curr_end_time > start_time:
- logger.warning("Duplicated sensors data")
- sindex = len(curr_arr) + (start_time - curr_end_time)
-
- if sindex < 0:
- values = values[-sindex:]
- sindex = 0
- logger.warning("Some data with timestamp before"
- " beginning of measurememts. Skip them")
-
- curr_arr[sindex:sindex + len(values)] = values
- else:
- curr_arr.extend(values)
-
- if len(curr_arr) > self.max_size:
- max_cut = max(len(curr_arr) - self.min_size, max_cut)
-
- if max_cut > 0:
- self.start_time += max_cut
- for val in vals.values():
- del val[:max_cut]
-
-
def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
fd.write("\n")
- BUFFER = 3
observed_nodes = set()
- testnodes_data = {
- 'io': {},
- 'cpu': {},
- }
try:
while True:
@@ -114,23 +35,12 @@
source_id = data.pop('source_id')
rep_time = data.pop('time')
+
if 'testnode' in source2roles_map.get(source_id, []):
- vl = testnodes_data['io'].get(rep_time, 0)
- sum_io_q = vl
- testnodes_data['io'][rep_time] = sum_io_q
-
- etime = time.time() - BUFFER
- for name, vals in testnodes_data.items():
- new_vals = {}
- for rtime, value in vals.items():
- if rtime < etime:
- data_store.set_values("testnodes:io", rtime, [value])
- else:
- new_vals[rtime] = value
-
- vals.clear()
- vals.update(new_vals)
-
+ sum_io_q = 0
+ data_store.update(rep_time,
+ {"testnodes:io": sum_io_q},
+ add=True)
except Exception:
logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")