2.0 is on the way
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index a2758c3..c218bff 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,12 +1,19 @@
import os
-from collections import namedtuple
-
-SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
-# SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
+import time
+import array
+import threading
-def provides(name: str):
+mod_name = "sensor"
+__version__ = (0, 1)
+
+
+SensorsMap = {}
+
+
+def provides(name):
def closure(func):
+ SensorsMap[name] = func
return func
return closure
@@ -28,16 +35,16 @@
def get_pid_list(disallowed_prefixes, allowed_prefixes):
"""Return pid list from list of pids and names"""
# exceptions
- but = disallowed_prefixes if disallowed_prefixes is not None else []
+ disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
if allowed_prefixes is None:
# if nothing setted - all ps will be returned except setted
result = [pid
for pid in os.listdir('/proc')
- if pid.isdigit() and pid not in but]
+ if pid.isdigit() and pid not in disallowed]
else:
result = []
for pid in os.listdir('/proc'):
- if pid.isdigit() and pid not in but:
+ if pid.isdigit() and pid not in disallowed:
name = get_pid_name(pid)
if pid in allowed_prefixes or \
any(name.startswith(val) for val in allowed_prefixes):
@@ -61,33 +68,6 @@
return "no_such_process"
-def delta(func, only_upd=True):
- prev = {}
- while True:
- for dev_name, vals in func():
- if dev_name not in prev:
- prev[dev_name] = {}
- for name, (val, _) in vals.items():
- prev[dev_name][name] = val
- else:
- dev_prev = prev[dev_name]
- res = {}
- for stat_name, (val, accum_val) in vals.items():
- if accum_val:
- if stat_name in dev_prev:
- delta = int(val) - int(dev_prev[stat_name])
- if not only_upd or 0 != delta:
- res[stat_name] = str(delta)
- dev_prev[stat_name] = val
- elif not only_upd or '0' != val:
- res[stat_name] = val
-
- if only_upd and len(res) == 0:
- continue
- yield dev_name, res
- yield None, None
-
-
# 1 - major number
# 2 - minor mumber
# 3 - device name
@@ -121,43 +101,14 @@
for line in open('/proc/diskstats'):
vals = line.split()
dev_name = vals[2]
-
dev_ok = is_dev_accepted(dev_name,
disallowed_prefixes,
allowed_prefixes)
- if dev_name[-1].isdigit():
- dev_ok = False
+ if not dev_ok or dev_name[-1].isdigit():
+ continue
- if dev_ok:
- for pos, name, accum_val in io_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
- return results
-
-
-def get_latency(stat1, stat2):
- disks = set(i.split('.', 1)[0] for i in stat1)
- results = {}
-
- for disk in disks:
- rdc = disk + '.reads_completed'
- wrc = disk + '.writes_completed'
- rdt = disk + '.rtime'
- wrt = disk + '.wtime'
- lat = 0.0
-
- io_ops1 = stat1[rdc].value + stat1[wrc].value
- io_ops2 = stat2[rdc].value + stat2[wrc].value
-
- diops = io_ops2 - io_ops1
-
- if diops != 0:
- io1 = stat1[rdt].value + stat1[wrt].value
- io2 = stat2[rdt].value + stat2[wrt].value
- lat = abs(float(io1 - io2)) / diops
-
- results[disk + '.latence'] = SensorInfo(lat, False)
-
+ for pos, name, _ in io_values_pos:
+ results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
return results
@@ -201,28 +152,8 @@
dev_ok = False
if dev_ok:
- for pos, name, accum_val in net_values_pos:
- sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
- return results
-
-
-@provides("perprocess-cpu")
-def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-
- for pid in pid_list:
- try:
- dev_name = get_pid_name(pid)
-
- pid_stat1 = pid_stat(pid)
-
- sensor_name = "{0}.{1}".format(dev_name, pid)
- results[sensor_name] = SensorInfo(pid_stat1, True)
- except IOError:
- # may be, proc has already terminated, skip it
- continue
+ for pos, name, _ in net_values_pos:
+ results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
return results
@@ -239,14 +170,19 @@
return float(int(utime) + int(stime))
-# Based on ps_mem.py:
-# Licence: LGPLv2
-# Author: P@draigBrady.com
-# Source: http://www.pixelbeat.org/scripts/ps_mem.py
-# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ # TODO(koder): fixed list of PID's nust be given
+ for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
+ try:
+ results["{0}.{1}".format(get_pid_name(pid), pid)] = pid_stat(pid)
+ except IOError:
+ # may be, proc has already terminated, skip it
+ continue
+ return results
-# Note shared is always a subset of rss (trs is not always)
def get_mem_stats(pid):
"""Return memory data of pid in format (private, shared)"""
@@ -257,7 +193,7 @@
private = 0
pss = 0
- # add 0.5KiB as this avg error due to trunctation
+ # add 0.5KiB as this avg error due to truncation
pss_adjust = 0.5
for line in lines:
@@ -279,39 +215,38 @@
return (private, shared)
+def get_ram_size():
+ """Return RAM size in Kb"""
+ with open("/proc/meminfo") as proc:
+ mem_total = proc.readline().split()
+ return int(mem_total[1])
+
+
@provides("perprocess-ram")
def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
- for pid in pid_list:
+ # TODO(koder): fixed list of PID's nust be given
+ for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
try:
dev_name = get_pid_name(pid)
private, shared = get_mem_stats(pid)
total = private + shared
sys_total = get_ram_size()
- usage = float(total) / float(sys_total)
+ usage = float(total) / sys_total
sensor_name = "{0}({1})".format(dev_name, pid)
- results[sensor_name + ".private_mem"] = SensorInfo(private, False)
- results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
- results[sensor_name + ".used_mem"] = SensorInfo(total, False)
- name = sensor_name + ".mem_usage_percent"
- results[name] = SensorInfo(usage * 100, False)
+ results.update([
+ (sensor_name + ".private_mem", private),
+ (sensor_name + ".shared_mem", shared),
+ (sensor_name + ".used_mem", total),
+ (sensor_name + ".mem_usage_percent", int(usage * 100))])
except IOError:
# permission denied or proc die
continue
return results
-
-def get_ram_size():
- """Return RAM size in Kb"""
- with open("/proc/meminfo") as proc:
- mem_total = proc.readline().split()
- return mem_total[1]
-
-
# 0 - cpu name
# 1 - user: normal processes executing in user mode
# 2 - nice: niced processes executing in user mode
@@ -341,13 +276,12 @@
dev_name = vals[0]
if dev_name == 'cpu':
- for pos, name, accum_val in cpu_values_pos:
+ for pos, name, _ in cpu_values_pos:
sensor_name = "{0}.{1}".format(dev_name, name)
- results[sensor_name] = SensorInfo(int(vals[pos]),
- accum_val)
+ results[sensor_name] = int(vals[pos])
elif dev_name == 'procs_blocked':
val = int(vals[1])
- results["cpu.procs_blocked"] = SensorInfo(val, False)
+ results["cpu.procs_blocked"] = val
elif dev_name.startswith('cpu'):
core_count += 1
@@ -355,9 +289,10 @@
TASKSPOS = 3
vals = open('/proc/loadavg').read().split()
ready_procs = vals[TASKSPOS].partition('/')[0]
+
# dec on current proc
procs_queue = (float(ready_procs) - 1) / core_count
- results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
+ results["cpu.procs_queue"] = procs_queue
return results
@@ -380,7 +315,9 @@
def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
if allowed_prefixes is None:
allowed_prefixes = ram_fields
+
results = {}
+
for line in open('/proc/meminfo'):
vals = line.split()
dev_name = vals[0].rstrip(":")
@@ -392,10 +329,112 @@
title = "ram.{0}".format(dev_name)
if dev_ok:
- results[title] = SensorInfo(int(vals[1]), False)
+ results[title] = int(vals[1])
if 'ram.MemFree' in results and 'ram.MemTotal' in results:
used = results['ram.MemTotal'].value - results['ram.MemFree'].value
- usage = float(used) / results['ram.MemTotal'].value
- results["ram.usage_percent"] = SensorInfo(usage, False)
+ results["ram.usage_percent"] = int(float(used) / results['ram.MemTotal'].value)
+
return results
+
+
+class SensorsData(object):
+ def __init__(self):
+ self.cond = threading.Condition()
+ self.collected_at = array.array("f")
+ self.stop = False
+ self.data = {} # map sensor_name to list of results
+ self.data_fd = None
+
+
+# TODO(koder): a lot code here can be optimized and cached, but nobody cares (c)
+def sensors_bg_thread(sensors_config, sdata):
+ next_collect_at = time.time()
+
+ while not sdata.stop:
+ dtime = next_collect_at - time.time()
+ if dtime > 0:
+ sdata.cond.wait(dtime)
+
+ if sdata.stop:
+ break
+
+ ctm = time.time()
+ curr = {}
+ for name, config in sensors_config.items():
+ params = {}
+
+ if "allow" in config:
+ params["allowed_prefixes"] = config["allow"]
+
+ if "disallow" in config:
+ params["disallowed_prefixes"] = config["disallow"]
+
+ curr[name] = SensorsMap[name](**params)
+
+ etm = time.time()
+
+ if etm - ctm > 0.1:
+ # TODO(koder): need to signal that something in not really ok with sensor collecting
+ pass
+
+ with sdata.cond:
+ sdata.collected_at.append(ctm)
+ for source_name, vals in curr.items():
+ for sensor_name, val in vals.items():
+ key = (source_name, sensor_name)
+ if key not in sdata.data:
+ sdata.data[key] = array.array("I", [val])
+ else:
+ sdata.data[key].append(val)
+
+
+sensors_thread = None
+sdata = None # type: SensorsData
+
+
+def rpc_start(sensors_config):
+ global sensors_thread
+ global sdata
+
+ if sensors_thread is not None:
+ raise ValueError("Thread already running")
+
+ sdata = SensorsData()
+ sensors_thread = threading.Thread(target=sensors_bg_thread, args=(sensors_config, sdata))
+ sensors_thread.daemon = True
+ sensors_thread.start()
+
+
+def rpc_get_updates():
+ if sdata is None:
+ raise ValueError("No sensor thread running")
+
+ with sdata.cond:
+ res = sdata.data
+ collected_at = sdata.collected_at
+ sdata.collected_at = array.array("f")
+ sdata.data = {name: array.array("I") for name in sdata.data}
+
+ return res, collected_at
+
+
+def rpc_stop():
+ global sensors_thread
+ global sdata
+
+ if sensors_thread is None:
+ raise ValueError("No sensor thread running")
+
+ sdata.stop = True
+ with sdata.cond:
+ sdata.cond.notify_all()
+
+ sensors_thread.join()
+ res = sdata.data
+ collected_at = sdata.collected_at
+
+ sensors_thread = None
+ sdata = None
+
+ return res, collected_at