moving code to cephlib
diff --git a/scripts/perf.py b/scripts/perf.py
new file mode 100644
index 0000000..96f3436
--- /dev/null
+++ b/scripts/perf.py
@@ -0,0 +1,7 @@
+from wally import main
+opts = "X -l DEBUG report /tmp/perf_tests/warm_doe".split()
+
+def x():
+ main.main(opts)
+
+x()
diff --git a/v2_plans.md b/v2_plans.md
index 56e2669..70a0caf 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,118 +1,176 @@
-* Remarks:
- * With current code impossible to do vm count scan test
+Wally состоит из частей, которые стоит
+разделить и унифицировать с другими тулами
+------------------------------------------
-* TODO next
- * unit tests for math functions
- * CEPH PERFORMANCE COUNTERS
- * Sync storage_structure
- * fix fio job summary
- * Use disk max QD as qd limit?
- * Cumulative statistic table for all jobs
- * Add column for job params, which show how many cluster resource consumed
- * show extra outliers with arrows
- * More X = func(QD) plots. Eg. - kurt/skew, etc.
- * Hide cluster load if no nodes available
- * Show latency skew and curtosis
- * Sort engineering report by result tuple
- * Name engineering reports by long summary
- * Latency heatmap and violin aren't consistent
- * profile violint plot
- * Fix plot layout, there to much unused space around typical plot
- * iops boxplot as function from QD
- * collect device types mapping from nodes - device should be block/net/...
- * Optimize sensor communication with ceph, can run fist OSD request for
- data validation only on start.
- * Update Storage test, add tests for stat and plot module
- * Aggregated sensors boxplot
- * Hitmap for aggregated sensors
- * automatically find what to plot from storage data (but also allow to select via config)
+* Сделать ceph-lib, вынести ее в отдельный проект,
+ должна поддерживать и 2.7 и 3.5 и не иметь строгих внешних
+ бинарных зависимостейю В нее вынести:
+ * Cluster detector
+ * Cluster info collector
+ * Monitoring
+ * FSStorage
+
+* Openstack VM spawn
+* Load generators
+* Load results visualizator
+* Cluster load visualization
+* Поиск узких мест
+* Расчет потребляемых ресурсов
+* Сопрягающий код
+* Хранилища должны легко подключаться
+
+* Расчет потребления ресурсов сделать конфигурируемым -
+ указывать соотношения чего с чем считать
+* В конфиге задавать storage plugin
+
+
+Ресурсы:
+--------
+На выходе из сенсоров есть
+
+NODE_OR_ROLE.DEVICE.SENSOR
+
+create namespace with all nodes/roles as objects with specially overloaded
+__getattr__ method to handle device and then handle sensor.
+Make eval on result
+
+
+(CLUSTER.DISK.riops + CLUSTER.DISK.wiops) / (VM.DISK.riops + VM.DISK.wiops)
+
+
+Remarks:
+--------
+
+* With current code impossible to do vm count scan test
+
+TODO next
+---------
+
+* Remove DBStorage, merge FSStorage and serializer into
+ ObjStorage, separate TSStorage.
+* Build WallyStorage on top of it, use only WallyStorage in code
+* check that OS key match what is stored on disk
+* unit tests for math functions
+* CEPH PERFORMANCE COUNTERS
+* Sync storage_structure
+* fix fio job summary
+* Use disk max QD as qd limit?
+* Cumulative statistic table for all jobs
+* Add column for job params, which show how many cluster resource consumed
+* show extra outliers with arrows
+* More X = func(QD) plots. Eg. - kurt/skew, etc.
+* Hide cluster load if no nodes available
+* Show latency skew and curtosis
+* Sort engineering report by result tuple
+* Name engineering reports by long summary
+* Latency heatmap and violin aren't consistent
+* profile violint plot
+* Fix plot layout, there to much unused space around typical plot
+* iops boxplot as function from QD
+* collect device types mapping from nodes - device should be block/net/...
+* Optimize sensor communication with ceph, can run fist OSD request for
+ data validation only on start.
+* Update Storage test, add tests for stat and plot module
+* Aggregated sensors boxplot
+* Hitmap for aggregated sensors
+* automatically find what to plot from storage data (but also allow to select via config)
Have to think:
- * Each sensor should collect only one portion of data. During
- start it should scan all available sources and tell upper code to create separated funcs for them.
- * store statistic results in storage
- * During prefill check io on file
- * Store percentiles levels in TS, separate 1D TS and 2D TS to different classes, store levels in 2D TS
- * weight average and deviation
- * C++/Go disk stat sensors to measure IOPS/Lat on milliseconds
+--------------
+* Send data to external storage
+* Each sensor should collect only one portion of data. During
+ start it should scan all available sources and tell upper code to create separated funcs for them.
+* store statistic results in storage
+* During prefill check io on file
+* Store percentiles levels in TS, separate 1D TS and 2D TS to different classes, store levels in 2D TS
+* weight average and deviation
+* C++/Go disk stat sensors to measure IOPS/Lat on milliseconds
* TODO large
- * Force to kill running fio on ctrl+C and correct cleanup or cleanup all previous run with 'wally cleanup PATH'
+------------
+* Force to kill running fio on ctrl+C and correct cleanup or cleanup all previous run with 'wally cleanup PATH'
* Code:
- * RW mixed report
- * RPC reconnect in case of errors
- * store more information for node - OSD settings, FS on test nodes, target block device settings on test nodes
- * Sensors
- - Revise sensors code. Prepack on node side, different sensors data types
- - perf
- - [bcc](https://github.com/iovisor/bcc)
- - ceph sensors
- * Config validation
- * Add sync 4k write with small set of thcount
- * Flexible SSH connection creds - use agent, default ssh settings or part of config
- * Remove created temporary files - create all tempfiles via func from .utils, which track them
- * Use ceph-monitoring from wally
- * Use warm-up detection to select real test time.
- * Report code:
- - Compatible report types set up by config and load??
- * Calculate statistic for previous iteration in background
+-------
+* RW mixed report
+* RPC reconnect in case of errors
+* store more information for node - OSD settings, FS on test nodes, target block device settings on test nodes
+* Sensors
+ - Revise sensors code. Prepack on node side, different sensors data types
+ - perf
+ - [bcc](https://github.com/iovisor/bcc)
+ - ceph sensors
+* Config validation
+* Add sync 4k write with small set of thcount
+* Flexible SSH connection creds - use agent, default ssh settings or part of config
+* Remove created temporary files - create all tempfiles via func from .utils, which track them
+* Use ceph-monitoring from wally
+* Use warm-up detection to select real test time.
+* Report code:
+ - Compatible report types set up by config and load??
+* Calculate statistic for previous iteration in background
* UT
- * UT, which run test with predefined in yaml cluster (cluster and config created separatelly, not with tests)
- and check that result storage work as expected. Declare db sheme in seaprated yaml file, UT should check.
- * White-box event logs for UT
- * Result-to-yaml for UT
+----
+* UT, which run test with predefined in yaml cluster (cluster and config created separatelly, not with tests)
+ and check that result storage work as expected. Declare db sheme in seaprated yaml file, UT should check.
+* White-box event logs for UT
+* Result-to-yaml for UT
* Infra:
- * Add script to download fio from git and build it
- * Docker/lxd public container as default distribution way
- * Update setup.py to provide CLI entry points
+--------
+* Add script to download fio from git and build it
+* Docker/lxd public container as default distribution way
+* Update setup.py to provide CLI entry points
* Statistical result check and report:
- * [Q-Q plot](https://en.wikipedia.org/wiki/Q%E2%80%93Q_plot)
- * Check results distribution
- * Warn for non-normal results
- * Check that distribution of different parts is close. Average performance should be steady across test
- * Node histogram distribution
- * Interactive report, which shows different plots and data,
- depending on selected visualization type
- * Offload simple report table to cvs/yaml/json/test/ascii_table
- * fio load reporters (visualizers), ceph report tool
- [ceph-viz-histo](https://github.com/cronburg/ceph-viz/tree/master/histogram)
- * evaluate bokeh for visualization
- * [flamegraph](https://www.youtube.com/watch?v=nZfNehCzGdw) for 'perf' output
- * detect internal pattern:
- - FFT
- - http://mabrek.github.io/
- - https://github.com/rushter/MLAlgorithms
- - https://github.com/rushter/data-science-blogs
- - https://habrahabr.ru/post/311092/
- - https://blog.cloudera.com/blog/2015/12/common-probability-distributions-the-data-scientists-crib-sheet/
- - http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.stats.mstats.normaltest.html
- - http://profitraders.com/Math/Shapiro.html
- - http://www.machinelearning.ru/wiki/index.php?title=%D0%9A%D1%80%D0%B8%D1%82%D0%B5%D1%80%D0%B8%D0%B9_%D1%85%D0%B8-%D0%BA%D0%B2%D0%B0%D0%B4%D1%80%D0%B0%D1%82
- - http://docs.scipy.org/doc/numpy/reference/generated/numpy.fft.fft.html#numpy.fft.fft
- - https://en.wikipedia.org/wiki/Log-normal_distribution
- - http://stats.stackexchange.com/questions/25709/what-distribution-is-most-commonly-used-to-model-server-response-time
- - http://www.lognormal.com/features/
- - http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
- * For HDD read/write - report caches hit ratio, maps of real read/writes, FS counters
- * Report help page, link for explanations
- * checkboxes for show/hide part of image
- * pop-up help for part of picture
- * pop-up text values for bars/lines
- * waterfall charts for ceph request processing
- * correct comparison between different systems
+--------------------------------------
+* KDE on latency, than found local extremums and estimate
+ effective cache sizes from them
+* [Q-Q plot](https://en.wikipedia.org/wiki/Q%E2%80%93Q_plot)
+* Check results distribution
+* Warn for non-normal results
+* Check that distribution of different parts is close. Average performance should be steady across test
+* Node histogram distribution
+* Interactive report, which shows different plots and data,
+ depending on selected visualization type
+* Offload simple report table to cvs/yaml/json/test/ascii_table
+* fio load reporters (visualizers), ceph report tool
+ [ceph-viz-histo](https://github.com/cronburg/ceph-viz/tree/master/histogram)
+* evaluate bokeh for visualization
+* [flamegraph](https://www.youtube.com/watch?v=nZfNehCzGdw) for 'perf' output
+* detect internal pattern:
+ - FFT
+ - http://mabrek.github.io/
+ - https://github.com/rushter/MLAlgorithms
+ - https://github.com/rushter/data-science-blogs
+ - https://habrahabr.ru/post/311092/
+ - https://blog.cloudera.com/blog/2015/12/common-probability-distributions-the-data-scientists-crib-sheet/
+ - http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.stats.mstats.normaltest.html
+ - http://profitraders.com/Math/Shapiro.html
+ - http://www.machinelearning.ru/wiki/index.php?title=%D0%9A%D1%80%D0%B8%D1%82%D0%B5%D1%80%D0%B8%D0%B9_%D1%85%D0%B8-%D0%BA%D0%B2%D0%B0%D0%B4%D1%80%D0%B0%D1%82
+ - http://docs.scipy.org/doc/numpy/reference/generated/numpy.fft.fft.html#numpy.fft.fft
+ - https://en.wikipedia.org/wiki/Log-normal_distribution
+ - http://stats.stackexchange.com/questions/25709/what-distribution-is-most-commonly-used-to-model-server-response-time
+ - http://www.lognormal.com/features/
+ - http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
+* For HDD read/write - report caches hit ratio, maps of real read/writes, FS counters
+* Report help page, link for explanations
+* checkboxes for show/hide part of image
+* pop-up help for part of picture
+* pop-up text values for bars/lines
+* waterfall charts for ceph request processing
+* correct comparison between different systems
* Maybe move to 2.1:
- * Add sensor collection time to them
- * Make collection interval configurable per sensor type, make collection time separated for each sensor
- * DB <-> files conversion, or just store all the time in files as well
- * Automatically scale QD till saturation
- * Runtime visualization
- * Integrate vdbench/spc/TPC/TPB
- * Add aio rpc client
- * Add integration tests with nbd
- * fix existing folder detection
- * Simple REST API for external in-browser UI
\ No newline at end of file
+--------------------
+* Add sensor collection time to them
+* Make collection interval configurable per sensor type, make collection time separated for each sensor
+* DB <-> files conversion, or just store all the time in files as well
+* Automatically scale QD till saturation
+* Runtime visualization
+* Integrate vdbench/spc/TPC/TPB
+* Add aio rpc client
+* Add integration tests with nbd
+* fix existing folder detection
+* Simple REST API for external in-browser UI
diff --git a/wally/ceph.py b/wally/ceph.py
index a374ed6..9734baa 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -14,83 +14,23 @@
from .utils import StopTestError, to_ip
+from cephlib import discover
+from cephlib.discover import OSDInfo
+
+
logger = logging.getLogger("wally")
-class OSDInfo:
- def __init__(self, id: int, journal: str = None, storage: str = None) -> None:
- self.id = id
- self.journal = journal
- self.storage = storage
-
-
-def get_osds_info(node: IRPCNode, conf: str, key: str) -> Dict[IP, List[OSDInfo]]:
+def get_osds_info(node: IRPCNode, ceph_extra_args: str = "") -> Dict[IP, List[OSDInfo]]:
"""Get set of osd's ip"""
-
- data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
- try:
- jdata = json.loads(data)
- except:
- open("/tmp/ceph-out.json", "w").write(data)
- raise
- ips = {} # type: Dict[IP, List[OSDInfo]]
- first_error = True
- for osd_data in jdata["osds"]:
- osd_id = int(osd_data["osd"])
- if "public_addr" not in osd_data:
- if first_error:
- logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
- "(all subsequent errors omitted)", osd_id)
- first_error = False
- else:
- ip_port = osd_data["public_addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ip = IP(ip_port.split(":")[0])
-
- osd_journal_path = None # type: Optional[str]
- osd_data_path = None # type: Optional[str]
-
- # TODO: parallelize this!
- osd_cfg = node.run("ceph -n osd.{} --show-config".format(osd_id))
- for line in osd_cfg.split("\n"):
- if line.startswith("osd_journal ="):
- osd_journal_path = line.split("=")[1].strip()
- elif line.startswith("osd_data ="):
- osd_data_path = line.split("=")[1].strip()
-
- if osd_data_path is None or osd_journal_path is None:
- open("/tmp/ceph-out.json", "w").write(osd_cfg)
- logger.error("Can't detect osd %s journal or storage path", osd_id)
- raise StopTestError()
-
- ips.setdefault(ip, []).append(OSDInfo(osd_id,
- journal=osd_journal_path,
- storage=osd_data_path))
- return ips
+ res = {} # type: Dict[IP, List[OSDInfo]]
+ return {IP(ip): osd_info_list
+ for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args)}
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
"""Return mon ip set"""
-
- data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
- jdata = json.loads(data)
- ips = set() # type: Set[IP]
- first_error = True
- for mon_data in jdata["monmap"]["mons"]:
- if "addr" not in mon_data:
- if first_error:
- mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
- logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
- "(all subsequent errors omitted)", mon_name)
- first_error = False
- else:
- ip_port = mon_data["addr"]
- if '/' in ip_port:
- ip_port = ip_port.split("/", 1)[0]
- ips.add(IP(ip_port.split(":")[0]))
-
- return ips
+ return {IP(ip) for ip in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
class DiscoverCephStage(Stage):
@@ -112,33 +52,39 @@
ceph = ctx.config.ceph
root_node_uri = cast(str, ceph.root_node)
cluster = ceph.get("cluster", "ceph")
+
conf = ceph.get("conf")
key = ceph.get("key")
- logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
- logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
-
- info = NodeInfo(parse_ssh_uri(root_node_uri), set())
-
if conf is None:
conf = "/etc/ceph/{}.conf".format(cluster)
if key is None:
key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+ ceph_extra_args = ""
+
+ if conf:
+ ceph_extra_args += " -c '{}'".format(conf)
+
+ if key:
+ ceph_extra_args += " -k '{}'".format(key)
+
+ logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
+ logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
+
+ info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+
ceph_params = {"cluster": cluster, "conf": conf, "key": key}
- with setup_rpc(connect(info),
- ctx.rpc_code,
- ctx.default_rpc_plugins,
+ with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
log_level=ctx.config.rpc_log_level) as node:
ssh_key = node.get_file_content("~/.ssh/id_rsa")
-
try:
ips = set()
- for ip, osds_info in get_osds_info(node, conf, key).items():
+ for ip, osds_info in get_osds_info(node, ceph_extra_args).items():
ips.add(ip)
creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
info = ctx.merge_node(creds, {'ceph-osd'})
@@ -156,7 +102,7 @@
try:
counter = 0
- for counter, ip in enumerate(get_mons_ips(node, conf, key)):
+ for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
info = ctx.merge_node(creds, {'ceph-mon'})
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index f2c9488..3672458 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,13 +1,13 @@
import os
import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
import numpy
from .suits.job import JobConfig
from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
-from .storage import Storage, csv_file_encoding
-from .utils import StopTestError, str2shape, shape2str
+from .storage import Storage
+from .utils import StopTestError
from .suits.all_suits import all_suits
@@ -41,7 +41,7 @@
sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
report_root = 'report/'
- plot_r = r'report/{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
+ plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
job_cfg = job_cfg_r.replace("\\.", '.')
suite_cfg = suite_cfg_r.replace("\\.", '.')
@@ -70,39 +70,92 @@
ts_header_size = 64
ts_header_format = "!IIIcc"
ts_arr_tag = 'csv'
+ csv_file_encoding = 'ascii'
def __init__(self, storage: Storage) -> None:
self.storage = storage
+ self.cache = {} # type: Dict[str, Tuple[int, int, Any, List[str]]]
def sync(self) -> None:
self.storage.sync()
# ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
+ def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
+ with self.storage.get_fd(path, "rb") as fd:
+ stats = os.fstat(fd.fileno())
+ if path in self.cache:
+ size, atime, obj, header = self.cache[path]
+ if size == stats.st_size and atime == stats.st_atime_ns:
+ return obj, header
+
+ header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
+ print("header =", header)
+ if skip_shape:
+ header = header[1:]
+ dt = fd.read().decode("utf-8").strip()
+ print(dt.split("\n")[0])
+ arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
+ if len(dt) != 0:
+ lines = dt.count("\n") + 1
+ columns = dt.split("\n", 1)[0].count(",") + 1
+ assert lines * columns == len(arr)
+ if columns == 1:
+ arr.shape = (lines,)
+ else:
+ arr.shape = (lines, columns)
+
+ self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
+ return arr, header[1:]
+
+ def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
+ header = [data.dtype.name] + header
+
+ exists = append_on_exists and path in self.storage
+ if len(data.shape) == 1:
+ # make array vertical to simplify reading
+ vw = data.view().reshape((data.shape[0], 1))
+ else:
+ vw = data
+
+ with self.storage.get_fd(path, "cb" if exists else "wb") as fd:
+ if exists:
+ curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
+ assert header == curr_header, \
+ "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
+ .format(path, header, curr_header)
+ fd.seek(0, os.SEEK_END)
+ else:
+ fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
+
+ numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
+ arr, header = self.load_array(path, skip_shape=True)
+ units, time_units = header
- with self.storage.get_fd(path, "rb") as fd:
- header = fd.readline().decode(csv_file_encoding).strip().split(",")
- shape, dtype, units, time_units = header
- arr = numpy.loadtxt(fd, delimiter=',', dtype=dtype)
+ data = arr[:,1:]
+ if data.shape[1] == 1:
+ data = data.reshape((-1,))
return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
raw=None,
- data=arr[:,1:].reshape(str2shape(shape)),
+ data=data,
times=arr[:,0],
source=ds,
units=units,
time_units=time_units)
def load_sensor(self, ds: DataSource) -> TimeSeries:
- collect_header, collected_at = self.storage.get_array(DB_paths.sensor_time.format(**ds.__dict__))
+ collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
-
- data_header, data = self.storage.get_array(DB_paths.sensor_data.format(**ds.__dict__))
+ data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
data_units = data_header[2]
assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
+ assert len(data.shape) == 1
+ assert len(collected_at.shape) == 1
+
return TimeSeries(ds.metric_fqdn,
raw=None,
data=data,
@@ -115,7 +168,7 @@
def check_plot_file(self, source: DataSource) -> Optional[str]:
path = DB_paths.plot.format(**source.__dict__)
- fpath = self.storage.resolve_raw(path)
+ fpath = self.storage.resolve_raw(DB_paths.report_root + path)
return path if os.path.exists(fpath) else None
# ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
@@ -140,22 +193,16 @@
assert ts.source.tag == self.ts_arr_tag
csv_path = DB_paths.ts.format(**ts.source.__dict__)
- header = [shape2str(ts.data.shape),
- ts.data.dtype.name,
- ts.units,
- ts.time_units]
+ header = [ts.data.dtype.name, ts.units, ts.time_units]
- with self.storage.get_fd(csv_path, "cb") as fd:
- tv = ts.times.view().reshape((-1, 1))
+ tv = ts.times.view().reshape((-1, 1))
+ if len(ts.data.shape) == 1:
+ dv = ts.data.view().reshape((ts.times.shape[0], -1))
+ else:
+ dv = ts.data
- if len(ts.data.shape) == 1:
- dv = ts.data.view().reshape((ts.times.shape[0], -1))
- else:
- dv = ts.data
-
- result = numpy.concatenate((tv, dv), axis=1)
- fd.write((",".join(map(str, header)) + "\n").encode(csv_file_encoding))
- numpy.savetxt(fd, result, delimiter=',', newline="\n", fmt="%lu")
+ result = numpy.concatenate((tv, dv), axis=1)
+ self.put_array(csv_path, result, header)
if ts.raw:
raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
@@ -170,10 +217,11 @@
# return path to file to be inserted into report
def put_plot_file(self, data: bytes, source: DataSource) -> str:
path = DB_paths.plot.format(**source.__dict__)
- return cast(str, self.storage.put_raw(data, path))
+ self.storage.put_raw(data, DB_paths.report_root + path)
+ return path
def put_report(self, report: str, name: str) -> str:
- return self.storage.put_raw(report.encode("utf8"), DB_paths.report_root + name)
+ return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
if ds.metric == 'collected_at':
@@ -182,7 +230,7 @@
else:
path = DB_paths.sensor_data
metrics_fqn = ds.metric_fqdn
- self.storage.append([ds.node_id, metrics_fqn, units], data, path.format(**ds.__dict__))
+ self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
# ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
@@ -217,7 +265,6 @@
def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
ts_glob = fill_path(DB_paths.ts_r, **filters)
-
for is_file, path, groups in self.iter_paths(ts_glob):
assert is_file
groups = groups.copy()
diff --git a/wally/report.py b/wally/report.py
index 0b0540e..68170ec 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -19,8 +19,7 @@
from .node_interfaces import NodeInfo
from .utils import b2ssize, b2ssize_10, STORAGE_ROLES
from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
- hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve,
- rebin_histogram)
+ hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve)
from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig,
IResultStorage)
from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
@@ -158,7 +157,7 @@
return IOSummary(job.qd,
nodes_count=len(suite.nodes_ids),
block_size=job.bsize,
- lat=calc_histo_stat_props(lat, bins_edges, StyleProfile.hist_boxes),
+ lat=calc_histo_stat_props(lat, bins_edges, rebins_count=StyleProfile.hist_boxes),
bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
#
@@ -192,7 +191,7 @@
def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
- tss = list(rstorage.iter_ts(suite, job, sensor=metric))
+ tss = list(rstorage.iter_ts(suite, job, metric=metric))
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
node_id=AGG_TAG,
@@ -214,13 +213,13 @@
"shape=%s. Can only process sensors with shape=[X, %s].",
ts.source.dev, ts.source.sensor, ts.source.node_id,
ts.data.shape, expected_lat_bins)
- continue
+ raise ValueError()
if metric != 'lat' and len(ts.data.shape) != 1:
- logger.error("Sensor %s.%s on node %s has" +
+ logger.error("Sensor %s.%s on node %s has " +
"shape=%s. Can only process 1D sensors.",
ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
- continue
+ raise ValueError()
# TODO: match times on different ts
agg_ts.data += ts.data
@@ -290,7 +289,7 @@
val_it = iter(sensor_data.data[pos1 - 1: pos2 + 1])
# result array, cumulative value per second
- result = numpy.zeros((end - begin) // MICRO)
+ result = numpy.zeros(int(end - begin) // MICRO)
idx = 0
curr_summ = 0
@@ -965,7 +964,7 @@
storage_nodes = [node.node_id for node in nodes if node.roles.intersection(STORAGE_ROLES)]
test_nodes = [node.node_id for node in nodes if "testnode" in node.roles]
- trange = [job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000]
+ trange = (job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000)
ops_done = io_transfered / fjob.bsize / KB
all_metrics = [
@@ -1035,7 +1034,7 @@
agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_lat_boxes)
+ lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, rebins_count=StyleProfile.hist_lat_boxes)
# import IPython
# IPython.embed()
@@ -1179,6 +1178,7 @@
units=units,
time_units="us",
source=ds)
+
fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
diff --git a/wally/sensors.py b/wally/sensors.py
index bcdb4e3..c773b34 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -95,7 +95,7 @@
node_cfg['ceph'].update(node.info.params['ceph'])
node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']] # type: ignore
- logger.debug("Setting up sensort RPC plugin for node %s", nid)
+ logger.debug("Setting up sensors RPC plugin for node %s", nid)
node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
ctx.sensors_run_on.add(nid)
logger.debug("Start monitoring node %s", nid)
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
deleted file mode 100644
index ffb0abd..0000000
--- a/wally/sensors_rpc_plugin.py
+++ /dev/null
@@ -1,687 +0,0 @@
-import os
-import sys
-import json
-import time
-import zlib
-import array
-import pprint
-import logging
-import threading
-import traceback
-import subprocess
-import collections
-
-
-import Pool # type: ignore
-
-
-mod_name = "sensors"
-__version__ = (0, 1)
-
-
-logger = logging.getLogger("agent.sensors")
-SensorsMap = {}
-
-
-class Sensor(object):
- def __init__(self, params, allowed=None, disallowed=None):
- self.params = params
- self.allowed = allowed
- self.disallowed = disallowed
- self.allowed_names = set()
-
- def add_data(self, device, name, value):
- pass
-
- def collect(self):
- pass
-
- def get_updates(self):
- pass
-
- @classmethod
- def unpack_results(cls, device, metric, data, typecode):
- pass
-
- def init(self):
- pass
-
- def stop(self):
- pass
-
-
-class ArraysSensor(Sensor):
- typecode = 'L'
-
- def __init__(self, params, allowed=None, disallowed=None):
- Sensor.__init__(self, params, allowed, disallowed)
- self.data = collections.defaultdict(lambda: array.array(self.typecode))
- self.prev_vals = {}
-
- def add_data(self, device, name, value):
- self.data[(device, name)].append(value)
-
- def add_relative(self, device, name, value):
- key = (device, name)
- pval = self.prev_vals.get(key)
- if pval is not None:
- self.data[key].append(value - pval)
- self.prev_vals[key] = value
-
- def get_updates(self):
- res = self.data
- self.data = collections.defaultdict(lambda: array.array(self.typecode))
- return {key: (arr.typecode, arr.tostring()) for key, arr in res.items()}
-
- @classmethod
- def unpack_results(cls, device, metric, packed, typecode):
- arr = array.array(typecode)
- if sys.version_info >= (3, 0, 0):
- arr.frombytes(packed)
- else:
- arr.fromstring(packed)
- return arr
-
- def is_dev_accepted(self, name):
- dev_ok = True
-
- if self.disallowed is not None:
- dev_ok = all(not name.startswith(prefix) for prefix in self.disallowed)
-
- if dev_ok and self.allowed is not None:
- dev_ok = any(name.startswith(prefix) for prefix in self.allowed)
-
- return dev_ok
-
-
-time_array_typechar = ArraysSensor.typecode
-
-
-def provides(name):
- def closure(cls):
- SensorsMap[name] = cls
- return cls
- return closure
-
-
-def get_pid_list(disallowed_prefixes, allowed_prefixes):
- """Return pid list from list of pids and names"""
- # exceptions
- disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
- if allowed_prefixes is None:
- # if nothing setted - all ps will be returned except setted
- result = [pid for pid in os.listdir('/proc')
- if pid.isdigit() and pid not in disallowed]
- else:
- result = []
- for pid in os.listdir('/proc'):
- if pid.isdigit() and pid not in disallowed:
- name = get_pid_name(pid)
- if pid in allowed_prefixes or any(name.startswith(val) for val in allowed_prefixes):
- # this is allowed pid?
- result.append(pid)
- return result
-
-
-def get_pid_name(pid):
- """Return name by pid"""
- try:
- with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
- try:
- cmd = pidfile.readline().split()[0]
- return os.path.basename(cmd).rstrip('\x00')
- except IndexError:
- # no cmd returned
- return "<NO NAME>"
- except IOError:
- # upstream wait any string, no matter if we couldn't read proc
- return "no_such_process"
-
-
-@provides("block-io")
-class BlockIOSensor(ArraysSensor):
- # 1 - major number
- # 2 - minor mumber
- # 3 - device name
- # 4 - reads completed successfully
- # 5 - reads merged
- # 6 - sectors read
- # 7 - time spent reading (ms)
- # 8 - writes completed
- # 9 - writes merged
- # 10 - sectors written
- # 11 - time spent writing (ms)
- # 12 - I/Os currently in progress
- # 13 - time spent doing I/Os (ms)
- # 14 - weighted time spent doing I/Os (ms)
-
- SECTOR_SIZE = 512
-
- io_values_pos = [
- (3, 'reads_completed', True, 1),
- (5, 'sectors_read', True, SECTOR_SIZE),
- (6, 'rtime', True, 1),
- (7, 'writes_completed', True, 1),
- (9, 'sectors_written', True, SECTOR_SIZE),
- (10, 'wtime', True, 1),
- (11, 'io_queue', False, 1),
- (13, 'io_time', True, 1)
- ]
-
- def __init__(self, *args, **kwargs):
- ArraysSensor.__init__(self, *args, **kwargs)
-
- if self.disallowed is None:
- self.disallowed = ('ram', 'loop')
-
- for line in open('/proc/diskstats'):
- vals = line.split()
- dev_name = vals[2]
- if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
- self.allowed_names.add(dev_name)
-
- self.collect(init_rel=True)
-
- def collect(self, init_rel=False):
- for line in open('/proc/diskstats'):
- vals = line.split()
- dev_name = vals[2]
-
- if dev_name not in self.allowed_names:
- continue
-
- for pos, name, aggregated, coef in self.io_values_pos:
- vl = int(vals[pos]) * coef
- if aggregated:
- self.add_relative(dev_name, name, vl)
- elif not init_rel:
- self.add_data(dev_name, name, int(vals[pos]))
-
-
-@provides("net-io")
-class NetIOSensor(ArraysSensor):
- # 1 - major number
- # 2 - minor mumber
- # 3 - device name
- # 4 - reads completed successfully
- # 5 - reads merged
- # 6 - sectors read
- # 7 - time spent reading (ms)
- # 8 - writes completed
- # 9 - writes merged
- # 10 - sectors written
- # 11 - time spent writing (ms)
- # 12 - I/Os currently in progress
- # 13 - time spent doing I/Os (ms)
- # 14 - weighted time spent doing I/Os (ms)
-
- net_values_pos = [
- (0, 'recv_bytes', True),
- (1, 'recv_packets', True),
- (8, 'send_bytes', True),
- (9, 'send_packets', True),
- ]
-
- def __init__(self, *args, **kwargs):
- ArraysSensor.__init__(self, *args, **kwargs)
-
- if self.disallowed is None:
- self.disallowed = ('docker', 'lo')
-
- if self.allowed is None:
- self.allowed = ('eth',)
-
- for _, _, aggregated in self.net_values_pos:
- assert aggregated, "Non-aggregated values is not supported in net sensor"
-
- for line in open('/proc/net/dev').readlines()[2:]:
- dev_name, stats = line.split(":", 1)
- dev_name = dev_name.strip()
- if self.is_dev_accepted(dev_name):
- self.allowed_names.add(dev_name)
-
- self.collect(init_rel=True)
-
- def collect(self, init_rel=False):
- for line in open('/proc/net/dev').readlines()[2:]:
- dev_name, stats = line.split(":", 1)
- dev_name = dev_name.strip()
- if dev_name in self.allowed_names:
- vals = stats.split()
- for pos, name, _ in self.net_values_pos:
- vl = int(vals[pos])
- self.add_relative(dev_name, name, vl )
-
-
-def pid_stat(pid):
- """Return total cpu usage time from process"""
- # read /proc/pid/stat
- with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
- proctimes = pidfile.readline().split()
- # get utime from /proc/<pid>/stat, 14 item
- utime = proctimes[13]
- # get stime from proc/<pid>/stat, 15 item
- stime = proctimes[14]
- # count total process used time
- return float(int(utime) + int(stime))
-
-
-@provides("perprocess-cpu")
-class ProcCpuSensor(ArraysSensor):
- def collect(self):
- # TODO(koder): fixed list of PID's must be given
- for pid in get_pid_list(self.disallowed, self.allowed):
- try:
- self.add_data(get_pid_name(pid), pid, pid_stat(pid))
- except IOError:
- # probably proc has already terminated, skip it
- continue
-
-
-def get_mem_stats(pid):
- """Return memory data of pid in format (private, shared)"""
-
- fname = '/proc/{0}/{1}'.format(pid, "smaps")
- lines = open(fname).readlines()
-
- shared = 0
- private = 0
- pss = 0
-
- # add 0.5KiB as this avg error due to truncation
- pss_adjust = 0.5
-
- for line in lines:
- if line.startswith("Shared"):
- shared += int(line.split()[1])
-
- if line.startswith("Private"):
- private += int(line.split()[1])
-
- if line.startswith("Pss"):
- pss += float(line.split()[1]) + pss_adjust
-
- # Note Shared + Private = Rss above
- # The Rss in smaps includes video card mem etc.
-
- if pss != 0:
- shared = int(pss - private)
-
- return (private, shared)
-
-
-def get_ram_size():
- """Return RAM size in Kb"""
- with open("/proc/meminfo") as proc:
- mem_total = proc.readline().split()
- return int(mem_total[1])
-
-
-@provides("perprocess-ram")
-class ProcRamSensor(ArraysSensor):
- def collect(self):
- # TODO(koder): fixed list of PID's nust be given
- for pid in get_pid_list(self.disallowed, self.allowed):
- try:
- dev_name = get_pid_name(pid)
-
- private, shared = get_mem_stats(pid)
- total = private + shared
- sys_total = get_ram_size()
- usage = float(total) / sys_total
-
- sensor_name = "{0}({1})".format(dev_name, pid)
-
- self.add_data(sensor_name, "private_mem", private)
- self.add_data(sensor_name, "shared_mem", shared),
- self.add_data(sensor_name, "used_mem", total),
- self.add_data(sensor_name, "mem_usage_percent", int(usage * 100))
- except IOError:
- # permission denied or proc die
- continue
-
-
-@provides("system-cpu")
-class SystemCPUSensor(ArraysSensor):
- # 0 - cpu name
- # 1 - user: normal processes executing in user mode
- # 2 - nice: niced processes executing in user mode
- # 3 - system: processes executing in kernel mode
- # 4 - idle: twiddling thumbs
- # 5 - iowait: waiting for I/O to complete
- # 6 - irq: servicing interrupts
- # 7 - softirq: servicing softirqs
-
- cpu_values_pos = [
- (1, 'user_processes', True),
- (2, 'nice_processes', True),
- (3, 'system_processes', True),
- (4, 'idle_time', True),
- ]
-
- def collect(self):
- # calculate core count
- core_count = 0
-
- for line in open('/proc/stat'):
- vals = line.split()
- dev_name = vals[0]
-
- if dev_name == 'cpu':
- for pos, name, _ in self.cpu_values_pos:
- self.add_data(dev_name, name, int(vals[pos]))
- elif dev_name == 'procs_blocked':
- self.add_data("cpu", "procs_blocked", int(vals[1]))
- elif dev_name.startswith('cpu'):
- core_count += 1
-
- # procs in queue
- TASKSPOS = 3
- vals = open('/proc/loadavg').read().split()
- ready_procs = vals[TASKSPOS].partition('/')[0]
-
- # dec on current proc
- procs_queue = (float(ready_procs) - 1) / core_count
- self.add_data("cpu", "procs_queue_x10", int(procs_queue * 10))
-
-
-@provides("system-ram")
-class SystemRAMSensor(ArraysSensor):
- # return this values or setted in allowed
- ram_fields = ['MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapCached',
- 'Dirty', 'Writeback', 'SwapTotal', 'SwapFree']
-
- def __init__(self, *args, **kwargs):
- ArraysSensor.__init__(self, *args, **kwargs)
-
- if self.allowed is None:
- self.allowed = self.ram_fields
-
- self.allowed_fields = set()
- for line in open('/proc/meminfo'):
- field_name = line.split()[0].rstrip(":")
- if self.is_dev_accepted(field_name):
- self.allowed_fields.add(field_name)
-
- def collect(self):
- for line in open('/proc/meminfo'):
- vals = line.split()
- field = vals[0].rstrip(":")
- if field in self.allowed_fields:
- self.add_data("ram", field, int(vals[1]))
-
-
-try:
- from ceph_daemon import admin_socket
-except ImportError:
- admin_socket = None
-
-
-@provides("ceph")
-class CephSensor(ArraysSensor):
-
- historic_duration = 2
- historic_size = 200
-
- def run_ceph_daemon_cmd(self, osd_id, *args):
- asok = "/var/run/ceph/{}-osd.{}.asok".format(self.cluster, osd_id)
- if admin_socket:
- res = admin_socket(asok, args)
- else:
- res = subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
-
- return res
-
- def collect(self):
- def get_val(dct, path):
- if '/' in path:
- root, next = path.split('/', 1)
- return get_val(dct[root], next)
- return dct[path]
-
- for osd_id in self.params['osds']:
- data = json.loads(self.run_ceph_daemon_cmd(osd_id, 'perf', 'dump'))
- for key_name in self.params['counters']:
- self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
-
- if 'historic' in self.params.get('sources', {}):
- self.historic.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
-
- if 'in_flight' in self.params.get('sources', {}):
- self.in_flight.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_ops_in_flight"))
-
- def set_osd_historic(self, duration, keep, osd_id):
- data = json.loads(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
- self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_duration {}".format(duration))
- self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_size {}".format(keep))
- return (data["duration to keep"], data["num to keep"])
-
- def init(self):
- self.cluster = self.params['cluster']
- self.prev_vals = {}
- self.historic = {}
- self.in_flight = {}
-
- if 'historic' in self.params.get('sources', {}):
- for osd_id in self.params['osds']:
- self.prev_vals[osd_id] = self.set_osd_historic(self.historic_duration, self.historic_size, osd_id)
-
- def stop(self):
- for osd_id, (duration, keep) in self.prev_vals.items():
- self.prev_vals[osd_id] = self.set_osd_historic(duration, keep, osd_id)
-
- def get_updates(self):
- res = super().get_updates()
-
- for osd_id, data in self.historic.items():
- res[("osd{}".format(osd_id), "historic")] = (None, data)
-
- self.historic = {}
-
- for osd_id, data in self.in_flight.items():
- res[("osd{}".format(osd_id), "in_flight")] = (None, data)
-
- self.in_flight = {}
-
- return res
-
- @classmethod
- def unpack_results(cls, device, metric, packed, typecode):
- if metric in ('historic', 'in_flight'):
- assert typecode is None
- return packed
-
- arr = array.array(typecode)
- if sys.version_info >= (3, 0, 0):
- arr.frombytes(packed)
- else:
- arr.fromstring(packed)
-
- return arr
-
-
-class SensorsData(object):
- def __init__(self):
- self.cond = threading.Condition()
- self.collected_at = array.array(time_array_typechar)
- self.stop = False
- self.sensors = {}
- self.data_fd = None # temporary file to store results
- self.exception = None
-
-
-def collect(sensors_config):
- curr = {}
- for name, config in sensors_config.items():
- params = {'config': config}
-
- if "allow" in config:
- params["allowed_prefixes"] = config["allow"]
-
- if "disallow" in config:
- params["disallowed_prefixes"] = config["disallow"]
-
- curr[name] = SensorsMap[name](**params)
- return curr
-
-
-def sensors_bg_thread(sensors_config, sdata):
- try:
- next_collect_at = time.time()
- if "pool_sz" in sensors_config:
- sensors_config = sensors_config.copy()
- pool_sz = sensors_config.pop("pool_sz")
- else:
- pool_sz = 32
-
- if pool_sz != 0:
- pool = Pool(sensors_config.get("pool_sz"))
- else:
- pool = None
-
- # prepare sensor classes
- with sdata.cond:
- sdata.sensors = {}
- for name, config in sensors_config.items():
- params = {'params': config}
- logger.debug("Start sensor %r with config %r", name, config)
-
- if "allow" in config:
- params["allowed_prefixes"] = config["allow"]
-
- if "disallow" in config:
- params["disallowed_prefixes"] = config["disallow"]
-
- sdata.sensors[name] = SensorsMap[name](**params)
- sdata.sensors[name].init()
-
- logger.debug("sensors.config = %s", pprint.pformat(sensors_config))
- logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys()))
-
- # TODO: handle exceptions here
- # main loop
- while not sdata.stop:
- dtime = next_collect_at - time.time()
- if dtime > 0:
- with sdata.cond:
- sdata.cond.wait(dtime)
-
- next_collect_at += 1.0
-
- if sdata.stop:
- break
-
- ctm = time.time()
- with sdata.cond:
- sdata.collected_at.append(int(ctm * 1000000))
- if pool is not None:
- caller = lambda x: x()
- for ok, val in pool.map(caller, [sensor.collect for sensor in sdata.sensors.values()]):
- if not ok:
- raise val
- else:
- for sensor in sdata.sensors.values():
- sensor.collect()
- etm = time.time()
- sdata.collected_at.append(int(etm * 1000000))
- logger.debug("Add data to collected_at - %s, %s", ctm, etm)
-
- if etm - ctm > 0.1:
- # TODO(koder): need to signal that something in not really ok with sensor collecting
- pass
-
- except Exception:
- logger.exception("In sensor BG thread")
- sdata.exception = traceback.format_exc()
- finally:
- for sensor in sdata.sensors.values():
- sensor.stop()
-
-
-sensors_thread = None
-sdata = None # type: SensorsData
-
-
-def rpc_start(sensors_config):
- global sensors_thread
- global sdata
-
- if array.array('L').itemsize != 8:
- message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
- " Can't provide sensors on this platform. Disable sensors in config and retry"
- raise ValueError(message.format(array.array('L').itemsize))
-
- if sensors_thread is not None:
- raise ValueError("Thread already running")
-
- sdata = SensorsData()
- sensors_thread = threading.Thread(target=sensors_bg_thread, args=(sensors_config, sdata))
- sensors_thread.daemon = True
- sensors_thread.start()
-
-
-def unpack_rpc_updates(res_tuple):
- offset_map, compressed_blob, compressed_collected_at_b = res_tuple
- blob = zlib.decompress(compressed_blob)
- collected_at_b = zlib.decompress(compressed_collected_at_b)
- collected_at = array.array(time_array_typechar)
- collected_at.frombytes(collected_at_b)
- yield 'collected_at', collected_at
-
- # TODO: data is unpacked/repacked here with no reason
- for sensor_path, (offset, size, typecode) in offset_map.items():
- sensor_path = sensor_path.decode("utf8")
- sensor_name, device, metric = sensor_path.split('.', 2)
- sensor_data = SensorsMap[sensor_name].unpack_results(device,
- metric,
- blob[offset:offset + size],
- typecode.decode("ascii"))
- yield sensor_path, sensor_data
-
-
-def rpc_get_updates():
- if sdata is None:
- raise ValueError("No sensor thread running")
-
- offset_map = collected_at = None
- blob = ""
-
- with sdata.cond:
- if sdata.exception:
- raise Exception(sdata.exception)
-
- offset_map = {}
- for sensor_name, sensor in sdata.sensors.items():
- for (device, metric), (typecode, val) in sensor.get_updates().items():
- offset_map["{}.{}.{}".format(sensor_name, device, metric)] = (len(blob), len(val), typecode)
- blob += val
-
- collected_at = sdata.collected_at
- sdata.collected_at = array.array(sdata.collected_at.typecode)
-
- logger.debug(str(collected_at))
- return offset_map, zlib.compress(blob), zlib.compress(collected_at.tostring())
-
-
-def rpc_stop():
- global sensors_thread
- global sdata
-
- if sensors_thread is None:
- raise ValueError("No sensor thread running")
-
- sdata.stop = True
- with sdata.cond:
- sdata.cond.notify_all()
-
- sensors_thread.join()
-
- if sdata.exception:
- raise Exception(sdata.exception)
-
- res = rpc_get_updates()
-
- sensors_thread = None
- sdata = None
-
- return res
diff --git a/wally/sensors_rpc_plugin.pyi b/wally/sensors_rpc_plugin.pyi
deleted file mode 100644
index 21fe1d5..0000000
--- a/wally/sensors_rpc_plugin.pyi
+++ /dev/null
@@ -1,22 +0,0 @@
-from typing import NamedTuple, TypeVar, Callable, Any, Optional, List, Iterable, Dict, Tuple
-
-Pid = TypeVar('Pid', str)
-AnyFunc = TypeVar('AnyFunc', Callable[..., Any])
-PrefixList = Optional[List[str]]
-SensorDict = Dict[str, int]
-
-def provides(name: str) -> Callable[[AnyFunc], AnyFunc]: ...
-def is_dev_accepted(name: str, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
-def get_pid_name(pid: Pid) -> str: ...
-def pid_stat(pid: Pid) -> float: ...
-def get_mem_stats(pid : Pid) -> Tuple[int, int]: ...
-def get_ram_size() -> int: ...
-
-def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-
-
diff --git a/wally/statistic.py b/wally/statistic.py
index 4ebfccc..7edff67 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -223,7 +223,7 @@
def find_ouliers(data: numpy.array,
center_range: Tuple[int, int] = (25, 75),
- cut_range: float = 3) -> numpy.array:
+ cut_range: float = 3.0) -> numpy.array:
v1, v2 = numpy.percentile(data, center_range)
return numpy.abs(data - (v1 + v2) / 2) > ((v2 - v1) / 2 * cut_range)
@@ -231,7 +231,7 @@
def find_ouliers_ts(data: numpy.array,
windows_size: int = 30,
center_range: Tuple[int, int] = (25, 75),
- cut_range: float = 3) -> numpy.array:
+ cut_range: float = 3.0) -> numpy.array:
outliers = numpy.empty(data.shape, dtype=bool)
if len(data) < windows_size:
diff --git a/wally/storage.py b/wally/storage.py
index c8edf5d..ab52e12 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -18,7 +18,6 @@
import numpy
from .common_types import IStorable
-from .utils import shape2str, str2shape
logger = logging.getLogger("wally")
@@ -61,6 +60,43 @@
pass
+class ITSStorage(metaclass=abc.ABCMeta):
+ """interface for low-level storage, which doesn't support serialization
+ and can operate only on bytes"""
+
+ @abc.abstractmethod
+ def put(self, value: bytes, path: str) -> None:
+ pass
+
+ @abc.abstractmethod
+ def get(self, path: str) -> bytes:
+ pass
+
+ @abc.abstractmethod
+ def rm(self, path: str) -> None:
+ pass
+
+ @abc.abstractmethod
+ def sync(self) -> None:
+ pass
+
+ @abc.abstractmethod
+ def __contains__(self, path: str) -> bool:
+ pass
+
+ @abc.abstractmethod
+ def get_fd(self, path: str, mode: str = "rb+") -> IO:
+ pass
+
+ @abc.abstractmethod
+ def sub_storage(self, path: str) -> 'ISimpleStorage':
+ pass
+
+ @abc.abstractmethod
+ def list(self, path: str) -> Iterator[Tuple[bool, str]]:
+ pass
+
+
class ISerializer(metaclass=abc.ABCMeta):
"""Interface for serialization class"""
@abc.abstractmethod
@@ -72,115 +108,13 @@
pass
-class DBStorage(ISimpleStorage):
-
- create_tb_sql = "CREATE TABLE IF NOT EXISTS wally_storage (key text, data blob, type text)"
- insert_sql = "INSERT INTO wally_storage VALUES (?, ?, ?)"
- update_sql = "UPDATE wally_storage SET data=?, type=? WHERE key=?"
- select_sql = "SELECT data, type FROM wally_storage WHERE key=?"
- contains_sql = "SELECT 1 FROM wally_storage WHERE key=?"
- rm_sql = "DELETE FROM wally_storage WHERE key LIKE '{}%'"
- list2_sql = "SELECT key, length(data), type FROM wally_storage"
- SQLITE3_THREADSAFE = 1
-
- def __init__(self, db_path: str = None, existing: bool = False,
- prefix: str = None, db: sqlite3.Connection = None) -> None:
-
- assert not prefix or "'" not in prefix, "Broken sql prefix {!r}".format(prefix)
-
- if db_path:
- self.existing = existing
- if existing:
- if not os.path.isfile(db_path):
- raise IOError("No storage found at {!r}".format(db_path))
-
- os.makedirs(os.path.dirname(db_path), exist_ok=True)
- if sqlite3.threadsafety != self.SQLITE3_THREADSAFE:
- raise RuntimeError("Sqlite3 compiled without threadsafe support, can't use DB storage on it")
-
- try:
- self.db = sqlite3.connect(db_path, check_same_thread=False)
- except sqlite3.OperationalError as exc:
- raise IOError("Can't open database at {!r}".format(db_path)) from exc
-
- self.db.execute(self.create_tb_sql)
- else:
- if db is None:
- raise ValueError("Either db or db_path parameter must be passed")
- self.db = db
-
- if prefix is None:
- self.prefix = ""
- elif not prefix.endswith('/'):
- self.prefix = prefix + '/'
- else:
- self.prefix = prefix
-
- def put(self, value: bytes, path: str) -> None:
- c = self.db.cursor()
- fpath = self.prefix + path
- c.execute(self.contains_sql, (fpath,))
- if len(c.fetchall()) == 0:
- c.execute(self.insert_sql, (fpath, value, 'yaml'))
- else:
- c.execute(self.update_sql, (value, 'yaml', fpath))
-
- def get(self, path: str) -> bytes:
- c = self.db.cursor()
- c.execute(self.select_sql, (self.prefix + path,))
- res = cast(List[Tuple[bytes, str]], c.fetchall()) # type: List[Tuple[bytes, str]]
- if not res:
- raise KeyError(path)
- assert len(res) == 1
- val, tp = res[0]
- assert tp == 'yaml'
- return val
-
- def rm(self, path: str) -> None:
- c = self.db.cursor()
- path = self.prefix + path
- assert "'" not in path, "Broken sql path {!r}".format(path)
- c.execute(self.rm_sql.format(path))
-
- def __contains__(self, path: str) -> bool:
- c = self.db.cursor()
- path = self.prefix + path
- c.execute(self.contains_sql, (self.prefix + path,))
- return len(c.fetchall()) != 0
-
- def print_tree(self):
- c = self.db.cursor()
- c.execute(self.list2_sql)
- data = list(c.fetchall())
- data.sort()
- print("------------------ DB ---------------------")
- for key, data_ln, type in data:
- print(key, data_ln, type)
- print("------------------ END --------------------")
-
- def sub_storage(self, path: str) -> 'DBStorage':
- return self.__class__(prefix=self.prefix + path, db=self.db)
-
- def sync(self):
- self.db.commit()
-
- def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
- raise NotImplementedError("SQLITE3 doesn't provide fd-like interface")
-
- def list(self, path: str) -> Iterator[Tuple[bool, str]]:
- raise NotImplementedError("SQLITE3 doesn't provide list method")
-
-
-DB_REL_PATH = "__db__.db"
-
-
class FSStorage(ISimpleStorage):
"""Store all data in files on FS"""
def __init__(self, root_path: str, existing: bool) -> None:
self.root_path = root_path
self.existing = existing
- self.ignored = {self.j(DB_REL_PATH), '.', '..'}
+ self.ignored = {'.', '..'}
def j(self, path: str) -> str:
return os.path.join(self.root_path, path)
@@ -288,37 +222,31 @@
pass
-csv_file_encoding = 'ascii'
-
-
class Storage:
"""interface for storage"""
- def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
- self.fs = fs_storage
- self.db = db_storage
+ def __init__(self, sstorage: ISimpleStorage, serializer: ISerializer) -> None:
+ self.sstorage = sstorage
self.serializer = serializer
def sub_storage(self, *path: str) -> 'Storage':
fpath = "/".join(path)
- return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
+ return self.__class__(self.sstorage.sub_storage(fpath), self.serializer)
def put(self, value: Any, *path: str) -> None:
dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
serialized = self.serializer.pack(dct_value) # type: ignore
fpath = "/".join(path)
- self.db.put(serialized, fpath)
- self.fs.put(serialized, fpath)
+ self.sstorage.put(serialized, fpath)
def put_list(self, value: Iterable[IStorable], *path: str) -> None:
serialized = self.serializer.pack([obj.raw() for obj in value]) # type: ignore
fpath = "/".join(path)
- self.db.put(serialized, fpath)
- self.fs.put(serialized, fpath)
+ self.sstorage.put(serialized, fpath)
def get(self, path: str, default: Any = _Raise) -> Any:
try:
- vl = self.db.get(path)
+ vl = self.sstorage.get(path)
except:
if default is _Raise:
raise
@@ -328,97 +256,30 @@
def rm(self, *path: str) -> None:
fpath = "/".join(path)
- self.fs.rm(fpath)
- self.db.rm(fpath)
+ self.sstorage.rm(fpath)
def __contains__(self, path: str) -> bool:
- return path in self.fs or path in self.db
+ return path in self.sstorage
def put_raw(self, val: bytes, *path: str) -> str:
fpath = "/".join(path)
- self.fs.put(val, fpath)
+ self.sstorage.put(val, fpath)
# TODO: dirty hack
return self.resolve_raw(fpath)
def resolve_raw(self, fpath) -> str:
- return cast(FSStorage, self.fs).j(fpath)
+ return cast(FSStorage, self.sstorage).j(fpath)
def get_raw(self, *path: str) -> bytes:
- return self.fs.get("/".join(path))
+ return self.sstorage.get("/".join(path))
def append_raw(self, value: bytes, *path: str) -> None:
- with self.fs.get_fd("/".join(path), "rb+") as fd:
+ with self.sstorage.get_fd("/".join(path), "rb+") as fd:
fd.seek(0, os.SEEK_END)
fd.write(value)
def get_fd(self, path: str, mode: str = "r") -> IO:
- return self.fs.get_fd(path, mode)
-
- def put_array(self, header: List[str], value: numpy.array, *path: str) -> None:
- for val in header:
- assert isinstance(val, str) and ',' not in val, \
- "Can't convert {!r} to array header, as it's values contains comma".format(header)
-
- fpath = "/".join(path)
- with self.get_fd(fpath, "wb") as fd:
- self.do_append(fd, header, value, fpath)
-
- def get_array(self, *path: str) -> Tuple[List[str], numpy.array]:
- path_s = "/".join(path)
- with self.get_fd(path_s, "rb") as fd:
- header = fd.readline().decode(csv_file_encoding).rstrip().split(",")
- type_code, second_axis = header[-2:]
- res = numpy.genfromtxt(fd, dtype=type_code, delimiter=',')
-
- if '0' == second_axis:
- res.shape = (len(res),)
-
- return header[:-2], res
-
- def append(self, header: List[str], value: numpy.array, *path: str) -> None:
- for val in header:
- assert isinstance(val, str) and ',' not in val, \
- "Can't convert {!r} to array header, as it's values contains comma".format(header)
-
- fpath = "/".join(path)
- with self.get_fd(fpath, "cb") as fd:
- self.do_append(fd, header, value, fpath, maybe_append=True)
-
- def do_append(self, fd, header: List[str], value: numpy.array, path: str, fmt="%lu",
- maybe_append: bool = False) -> None:
-
- if len(value.shape) == 1:
- second_axis = 0
- else:
- second_axis = value.shape[1]
- header += [value.dtype.name, str(second_axis)]
-
- write_header = False
-
- if maybe_append:
- fd.seek(0, os.SEEK_END)
- if fd.tell() != 0:
- fd.seek(0, os.SEEK_SET)
- # check header match
- curr_header = fd.readline().decode(csv_file_encoding).rstrip().split(",")
- assert header == curr_header, \
- "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
- .format(path, header, curr_header)
- fd.seek(0, os.SEEK_END)
- else:
- write_header = True
- else:
- write_header = True
-
- if write_header:
- fd.write((",".join(header) + "\n").encode(csv_file_encoding))
-
- if len(value.shape) == 1:
- # make array vertical to simplify reading
- vw = value.view().reshape((value.shape[0], 1))
- else:
- vw = value
- numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt=fmt)
+ return self.sstorage.get_fd(path, mode)
def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
path_s = "/".join(path)
@@ -431,8 +292,7 @@
return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
def sync(self) -> None:
- self.db.sync()
- self.fs.sync()
+ self.sstorage.sync()
def __enter__(self) -> 'Storage':
return self
@@ -441,7 +301,7 @@
self.sync()
def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
- return self.fs.list("/".join(path))
+ return self.sstorage.list("/".join(path))
def _iter_paths(self,
root: str,
@@ -472,7 +332,5 @@
def make_storage(url: str, existing: bool = False) -> Storage:
- return Storage(FSStorage(url, existing),
- DBStorage(os.path.join(url, DB_REL_PATH)),
- SAFEYAMLSerializer())
+ return Storage(FSStorage(url, existing), SAFEYAMLSerializer())