2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 0f788ed..233f6e2 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,13 +1,17 @@
+#!/usr/bin/env python3
+
+import re
import os
import sys
import copy
import os.path
import argparse
import itertools
+from typing import Optional, Generator, Union, Dict, Iterable, Any, List, TypeVar, Callable
from collections import OrderedDict, namedtuple
-from wally.utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b
SECTION = 0
@@ -20,20 +24,20 @@
'tp', 'name', 'val'))
-class FioJobSection(object):
- def __init__(self, name):
+class FioJobSection:
+ def __init__(self, name: str):
self.name = name
self.vals = OrderedDict()
- def copy(self):
+ def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
- def required_vars(self):
+ def required_vars(self) -> Generator[str, Var]:
for name, val in self.vals.items():
if isinstance(val, Var):
yield name, val
- def is_free(self):
+ def is_free(self) -> bool:
return len(list(self.required_vars())) == 0
def __str__(self):
@@ -51,7 +55,7 @@
class ParseError(ValueError):
- def __init__(self, msg, fname, lineno, line_cont=""):
+ def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
ValueError.__init__(self, msg)
self.file_name = fname
self.lineno = lineno
@@ -65,21 +69,11 @@
super(ParseError, self).__str__())
-def is_name(name):
- if len(name) == 0:
- return False
-
- if name[0] != '_' and not name[0].isalpha():
- return False
-
- for ch in name[1:]:
- if name[0] != '_' and not name[0].isalnum():
- return False
-
- return True
+def is_name(name: str) -> bool:
+ return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
-def parse_value(val):
+def parse_value(val: str) -> Union[int, str, Dict, Var]:
try:
return int(val)
except ValueError:
@@ -103,7 +97,7 @@
return val
-def fio_config_lexer(fio_cfg, fname):
+def fio_config_lexer(fio_cfg: str, fname: str) -> Generator[CfgLine]:
for lineno, oline in enumerate(fio_cfg.split("\n")):
try:
line = oline.strip()
@@ -136,7 +130,7 @@
raise ParseError(str(exc), fname, lineno, oline)
-def fio_config_parse(lexer_iter):
+def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Generator[FioJobSection]:
in_globals = False
curr_section = None
glob_vals = OrderedDict()
@@ -210,19 +204,7 @@
yield curr_section
-def process_repeats(sec):
- sec = sec.copy()
- count = sec.vals.pop('NUM_ROUNDS', 1)
- assert isinstance(count, (int, long))
-
- for _ in range(count):
- yield sec.copy()
-
- if 'ramp_time' in sec.vals:
- sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
-
-def process_cycles(sec):
+def process_cycles(sec: FioJobSection) -> Generator[FioJobSection]:
cycles = OrderedDict()
for name, val in sec.vals.items():
@@ -232,8 +214,8 @@
if len(cycles) == 0:
yield sec
else:
- # thread should changes faster
- numjobs = cycles.pop('numjobs', None)
+ # qd should changes faster
+ numjobs = cycles.pop('qd', None)
items = cycles.items()
if len(items) > 0:
@@ -246,7 +228,7 @@
if numjobs is not None:
vals.append(numjobs)
- keys.append('numjobs')
+ keys.append('qd')
for combination in itertools.product(*vals):
new_sec = sec.copy()
@@ -254,7 +236,11 @@
yield new_sec
-def apply_params(sec, params):
+FIO_PARAM_VAL = Union[str, Var]
+FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+
+
+def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
processed_vals = OrderedDict()
processed_vals.update(params)
for name, val in sec.vals.items():
@@ -273,10 +259,7 @@
return sec
-MAGIC_OFFSET = 0.1885
-
-
-def abbv_name_to_full(name):
+def abbv_name_to_full(name: str) -> str:
assert len(name) == 3
smode = {
@@ -291,12 +274,11 @@
off_mode[name[0]] + " " + oper[name[1]]
-def finall_process(sec, counter=[0]):
- sec = sec.copy()
+MAGIC_OFFSET = 0.1885
- if sec.vals.get('numjobs', '1') != 1:
- msg = "Group reporting should be set if numjobs != 1"
- assert 'group_reporting' in sec.vals, msg
+
+def finall_process(sec: FioJobSection, counter: Optional[List[int]] = [0]) -> FioJobSection:
+ sec = sec.copy()
sec.vals['unified_rw_reporting'] = '1'
@@ -328,7 +310,7 @@
return sec
-def get_test_sync_mode(sec):
+def get_test_sync_mode(sec: FioJobSection) -> str:
if isinstance(sec, dict):
vals = sec
else:
@@ -347,10 +329,10 @@
return 'a'
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "th_count", "vm_count"))
+TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-def get_test_summary_tuple(sec, vm_count=None):
+def get_test_summary_tuple(sec: FioJobSection, vm_count=None) -> TestSumm:
if isinstance(sec, dict):
vals = sec
else:
@@ -365,48 +347,51 @@
"readwrite": "sm"}[vals["rw"]]
sync_mode = get_test_sync_mode(sec)
- th_count = vals.get('numjobs')
-
- if th_count is None:
- th_count = vals.get('concurence', 1)
return TestSumm(rw,
sync_mode,
vals['blocksize'],
- th_count,
+ vals['iodepth'],
vm_count)
-def get_test_summary(sec, vm_count=None):
+def get_test_summary(sec: FioJobSection, vm_count: int=None, noqd: Optional[bool]=False) -> str:
tpl = get_test_summary_tuple(sec, vm_count)
- res = "{0.oper}{0.mode}{0.bsize}th{0.th_count}".format(tpl)
+
+ res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
+ if not noqd:
+ res += "qd{}".format(tpl.qd)
if tpl.vm_count is not None:
- res += "vm" + str(tpl.vm_count)
+ res += "vm{}".format(tpl.vm_count)
return res
-def execution_time(sec):
+def execution_time(sec: FioJobSection) -> int:
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source, fname=None):
+def parse_all_in_1(source:str, fname: str=None) -> Generator[FioJobSection]:
return fio_config_parse(fio_config_lexer(source, fname))
-def flatmap(func, inp_iter):
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+ inp_iter: Iterable[FM_FUNC_INPUT]) -> Generator[FM_FUNC_RES]:
for val in inp_iter:
for res in func(val):
yield res
-def fio_cfg_compile(source, fname, test_params):
+def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Generator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
- it = flatmap(process_repeats, it)
- return itertools.imap(finall_process, it)
+ return map(finall_process, it)
def parse_args(argv):
@@ -438,12 +423,12 @@
sec_it = fio_cfg_compile(job_cfg, argv_obj.jobfile, params)
if argv_obj.action == 'estimate':
- print sec_to_str(sum(map(execution_time, sec_it)))
+ print(sec_to_str(sum(map(execution_time, sec_it))))
elif argv_obj.action == 'num_tests':
- print sum(map(len, map(list, sec_it)))
+ print(sum(map(len, map(list, sec_it))))
elif argv_obj.action == 'compile':
splitter = "\n#" + "-" * 70 + "\n\n"
- print splitter.join(map(str, sec_it))
+ print(splitter.join(map(str, sec_it)))
return 0