resume working
diff --git a/wally/statistic.py b/wally/statistic.py
index e2021e1..2263788 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,48 +1,119 @@
import math
import itertools
+import statistics
+from typing import Union, List, TypeVar, Callable, Iterable, Tuple, Any, cast, Dict
-try:
- from scipy import stats
- from numpy import array, linalg
- from scipy.optimize import leastsq
- from numpy.polynomial.chebyshev import chebfit, chebval
- no_numpy = False
-except ImportError:
- no_numpy = True
+import numpy
+from scipy import stats, optimize
+from numpy import linalg
+from numpy.polynomial.chebyshev import chebfit, chebval
-def average(data):
- return sum(data) / len(data)
+from .result_classes import IStorable
-def med_dev(vals):
- if len(vals) == 1:
- return vals[0], 0.0
-
- med = sum(vals) / len(vals)
- dev = ((sum(abs(med - i) ** 2.0 for i in vals) / (len(vals) - 1)) ** 0.5)
- return med, dev
+Number = Union[int, float]
+TNumber = TypeVar('TNumber', int, float)
-def round_3_digit(val):
- return round_deviation((val, val / 10.0))[0]
+DOUBLE_DELTA = 1e-8
-def round_deviation(med_dev):
- med, dev = med_dev
-
- if dev < 1E-7:
- return med_dev
-
- dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
- dev = int(dev / dev_div) * dev_div
- med = int(med / dev_div) * dev_div
- return [type(med_dev[0])(med),
- type(med_dev[1])(dev)]
+average = statistics.mean
+dev = statistics.variance
-def groupby_globally(data, key_func):
- grouped = {}
+class StatProps(IStorable):
+ "Statistic properties for timeserie"
+
+ yaml_tag = 'stat'
+
+ def __init__(self, data: List[Number]) -> None:
+ self.average = None # type: float
+ self.deviation = None # type: float
+ self.confidence = None # type: float
+ self.confidence_level = None # type: float
+
+ self.perc_99 = None # type: float
+ self.perc_95 = None # type: float
+ self.perc_90 = None # type: float
+ self.perc_50 = None # type: float
+
+ self.min = None # type: Number
+ self.max = None # type: Number
+
+ # bin_center: bin_count
+ self.histo = None # type: Tuple[List[int], List[float]]
+ self.data = data
+
+ self.normtest = None # type: Any
+
+ def __str__(self) -> str:
+ res = ["StatProps(num_el={}):".format(len(self.data)),
+ " distr = {0.average} ~ {0.deviation}".format(self),
+ " confidence({0.confidence_level}) = {0.confidence}".format(self),
+ " perc50={0.perc50}".format(self),
+ " perc90={0.perc90}".format(self),
+ " perc95={0.perc95}".format(self),
+ " perc95={0.perc99}".format(self),
+ " range {0.min} {0.max}".format(self),
+ " nurmtest = {0.nortest}".format(self)]
+ return "\n".join(res)
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def raw(self) -> Dict[str, Any]:
+ return self.__dict__.copy()
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
+ res = cls.__new__(cls)
+ res.__dict__.update(data)
+ return res
+
+
+def greater_digit_pos(val: Number) -> int:
+ return int(math.floor(math.log10(val))) + 1
+
+
+def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
+ pow = 10 ** (greater_digit_pos(val) - num_digits)
+ return type(val)(int(val / pow) * pow)
+
+
+def calc_stat_props(data: List[Number], confidence: float = 0.95) -> StatProps:
+ "Calculate statistical properties of array of numbers"
+
+ res = StatProps(data)
+
+ if len(data) == 0:
+ raise ValueError("Input array is empty")
+
+ data = sorted(data)
+ res.average = average(data)
+ res.deviation = dev(data)
+ res.max = data[-1]
+ res.min = data[0]
+
+ res.perc_50 = numpy.percentile(data, 50)
+ res.perc_90 = numpy.percentile(data, 90)
+ res.perc_95 = numpy.percentile(data, 95)
+ res.perc_99 = numpy.percentile(data, 99)
+
+ if len(data) >= 3:
+ res.confidence = stats.sem(data) * \
+ stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+ else:
+ res.confidence = None
+
+ res.histo = numpy.histogram(data, 'auto')
+ res.normtest = stats.mstats.normaltest(data)
+ return res
+
+
+def groupby_globally(data: Iterable, key_func: Callable):
+ grouped = {} # type: ignore
grouped_iter = itertools.groupby(data, key_func)
for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
@@ -52,25 +123,19 @@
return grouped
-def approximate_curve(x, y, xnew, curved_coef):
+def approximate_curve(x: List[Number], y: List[float], xnew: List[Number], curved_coef: int) -> List[float]:
"""returns ynew - y values of some curve approximation"""
- if no_numpy:
- return None
-
- return chebval(xnew, chebfit(x, y, curved_coef))
+ return cast(List[float], chebval(xnew, chebfit(x, y, curved_coef)))
-def approximate_line(x, y, xnew, relative_dist=False):
- """ x, y - test data, xnew - dots, where we want find approximation
- if not relative_dist distance = y - newy
- returns ynew - y values of linear approximation"""
-
- if no_numpy:
- return None
-
- # convert to numpy.array (don't work without it)
- ox = array(x)
- oy = array(y)
+def approximate_line(x: List[Number], y: List[float], xnew: List[Number], relative_dist: bool = False) -> List[float]:
+ """
+ x, y - test data, xnew - dots, where we want find approximation
+ if not relative_dist distance = y - newy
+ returns ynew - y values of linear approximation
+ """
+ ox = numpy.array(x)
+ oy = numpy.array(y)
# set approximation function
def func_line(tpl, x):
@@ -89,108 +154,41 @@
oy[:2]))
# find line
- tpl_final, success = leastsq(error_func,
- tpl_initial[:],
- args=(ox, oy))
+ tpl_final, success = optimize.leastsq(error_func, tpl_initial[:], args=(ox, oy))
# if error
if success not in range(1, 5):
raise ValueError("No line for this dots")
# return new dots
- return func_line(tpl_final, array(xnew))
+ return func_line(tpl_final, numpy.array(xnew))
-def difference(y, ynew):
- """returns average and maximum relative and
- absolute differences between y and ynew
- result may contain None values for y = 0
- return value - tuple:
- [(abs dif, rel dif) * len(y)],
- (abs average, abs max),
- (rel average, rel max)"""
-
- abs_dlist = []
- rel_dlist = []
-
- for y1, y2 in zip(y, ynew):
- # absolute
- abs_dlist.append(y1 - y2)
-
- if y1 > 1E-6:
- rel_dlist.append(abs(abs_dlist[-1] / y1))
- else:
- raise ZeroDivisionError("{0!r} is too small".format(y1))
-
- da_avg = sum(abs_dlist) / len(abs_dlist)
- dr_avg = sum(rel_dlist) / len(rel_dlist)
-
- return (zip(abs_dlist, rel_dlist),
- (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
- )
-
-
-def calculate_distribution_properties(data):
- """chi, etc"""
-
-
-def minimal_measurement_count(data, max_diff, req_probability):
- """
- should returns amount of measurements to get results (avg and deviation)
- with error less, that max_diff in at least req_probability% cases
- """
-
-
-class StatProps(object):
- def __init__(self):
- self.average = None
- self.mediana = None
- self.perc_95 = None
- self.perc_5 = None
- self.deviation = None
- self.confidence = None
- self.min = None
- self.max = None
- self.raw = None
-
- def rounded_average_conf(self):
- return round_deviation((self.average, self.confidence))
-
- def rounded_average_dev(self):
- return round_deviation((self.average, self.deviation))
-
- def __str__(self):
- return "StatProps({0} ~ {1})".format(round_3_digit(self.average),
- round_3_digit(self.deviation))
-
- def __repr__(self):
- return str(self)
-
-
-def data_property(data, confidence=0.95):
- res = StatProps()
- if len(data) == 0:
- return res
-
- data = sorted(data)
- res.average, res.deviation = med_dev(data)
- res.max = data[-1]
- res.min = data[0]
-
- ln = len(data)
- if ln % 2 == 0:
- res.mediana = (data[ln / 2] + data[ln / 2 - 1]) / 2
- else:
- res.mediana = data[ln / 2]
-
- res.perc_95 = data[int((ln - 1) * 0.95)]
- res.perc_5 = data[int((ln - 1) * 0.05)]
-
- if not no_numpy and ln >= 3:
- res.confidence = stats.sem(data) * \
- stats.t.ppf((1 + confidence) / 2, ln - 1)
- else:
- res.confidence = res.deviation
-
- res.raw = data[:]
- return res
+# TODO: revise next
+# def difference(y, ynew):
+# """returns average and maximum relative and
+# absolute differences between y and ynew
+# result may contain None values for y = 0
+# return value - tuple:
+# [(abs dif, rel dif) * len(y)],
+# (abs average, abs max),
+# (rel average, rel max)"""
+#
+# abs_dlist = []
+# rel_dlist = []
+#
+# for y1, y2 in zip(y, ynew):
+# # absolute
+# abs_dlist.append(y1 - y2)
+#
+# if y1 > 1E-6:
+# rel_dlist.append(abs(abs_dlist[-1] / y1))
+# else:
+# raise ZeroDivisionError("{0!r} is too small".format(y1))
+#
+# da_avg = sum(abs_dlist) / len(abs_dlist)
+# dr_avg = sum(rel_dlist) / len(rel_dlist)
+#
+# return (zip(abs_dlist, rel_dlist),
+# (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
+# )