fixes, fixes, fixes
diff --git a/configs/config.yaml b/configs/config.yaml
index b2d80ea..a5a4b16 100644
--- a/configs/config.yaml
+++ b/configs/config.yaml
@@ -1,6 +1,5 @@
clouds:
fuel:
- # ext_ip: 172.16.53.3
url: http://172.16.52.112:8000/
creds: admin:admin@admin
ssh_creds: root:test37
diff --git a/configs/local_ceph.yaml b/configs/local_ceph.yaml
index fc54189..7948093 100644
--- a/configs/local_ceph.yaml
+++ b/configs/local_ceph.yaml
@@ -4,25 +4,24 @@
discover: ceph
explicit_nodes:
- "ssh://koder@192.168.152.43::/home/koder/.ssh/id_rsa": testnode
+ # "ssh://koder@192.168.152.43::/home/koder/.ssh/id_rsa": testnode
+ local: testnode
internal:
var_dir_root: /tmp/perf_tests
sensors:
- receiver_url: "udp://{ip}:5699"
roles_mapping:
- ceph-osd: block-io
- cinder: block-io, system-cpu
testnode: system-cpu, block-io
tests:
- io:
# cfg: tests/io_scenario_hdd.cfg
cfg: scripts/fio_tests_configs/io_task_test.cfg
+ use_sudo: false
params:
FILENAME: /mnt/ceph/xxx.bin
- NUM_ROUNDS: 7
+ NUM_ROUNDS: 3
logging:
extra_logs: 1
diff --git a/configs/usb_hdd.yaml b/configs/usb_hdd.yaml
index 4ad0d97..67b2d83 100644
--- a/configs/usb_hdd.yaml
+++ b/configs/usb_hdd.yaml
@@ -1,16 +1,15 @@
explicit_nodes:
# local: testnode
- "ssh://koder@localhost::/home/koder/.ssh/id_rsa": testnode
+ "ssh://koder@koder-gup::/home/koder/.ssh/id_rsa": testnode
internal:
var_dir_root: /tmp/perf_tests
-# sensors:
-# receiver_url: "udp://{ip}:5699"
-# roles_mapping:
-# ceph-osd: block-io
-# cinder: block-io, system-cpu
-# testnode: system-cpu, block-io
+testnode_log_root: /tmp/perf_tests_rem
+
+sensors:
+ roles_mapping:
+ testnode: system-cpu, block-io
tests:
- io:
diff --git a/configs/vEnv-3-2.yaml b/configs/vEnv-3-2.yaml
index 385013a..678067e 100644
--- a/configs/vEnv-3-2.yaml
+++ b/configs/vEnv-3-2.yaml
@@ -14,7 +14,6 @@
var_dir_root: /tmp/perf_tests
# sensors:
-# receiver_url: "udp://{ip}:5699"
# roles_mapping:
# ceph-osd: block-io
# cinder: block-io, system-cpu
diff --git a/wally/config.py b/wally/config.py
index 8eafecc..83ba0ca 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -55,8 +55,10 @@
cfg_dict['log_file'] = in_var_dir('log.txt')
cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+ testnode_log_root = cfg_dict.get('testnode_log_root', '/var/wally')
+ testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
cfg_dict['default_test_local_folder'] = \
- "/var/wally/{0}/{{name}}".format(cfg_dict['run_uuid'])
+ testnode_log_dir.format(cfg_dict['run_uuid'])
cfg_dict['test_log_directory'] = in_var_dir('test_logs')
mkdirs_if_unxists(cfg_dict['test_log_directory'])
@@ -138,6 +140,6 @@
logger = logging.getLogger('paramiko')
logger.setLevel(logging.WARNING)
- logger.addHandler(sh)
+ # logger.addHandler(sh)
if fh is not None:
logger.addHandler(fh)
diff --git a/wally/report.py b/wally/report.py
index 5c7c277..5198d13 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -147,6 +147,17 @@
return hdi
+report_funcs = []
+
+
+def report(names):
+ def closure(func):
+ report_funcs.append((names.split(","), func))
+ return func
+ return closure
+
+
+@report('hdd_test_rrd4k,hdd_test_rws4k')
def make_hdd_report(processed_results, path, lab_info):
make_plots(processed_results, path)
di = get_disk_info(processed_results)
@@ -171,10 +182,17 @@
try:
processed_results = process_disk_info(results)
- if 'hdd_test_rrd4k' and 'hdd_test_rws4k':
- make_hdd_report(processed_results, path, lab_info)
+
+ for fields, func in report_funcs:
+ for field in fields:
+ if field not in processed_results:
+ break
+ else:
+ func(processed_results, path, lab_info)
+ break
else:
logger.warning("No report generator found for this load")
+
except Exception as exc:
logger.error("Failed to generate html report:" + str(exc))
else:
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index ea73789..e8c6261 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -4,7 +4,7 @@
from .deploy_sensors import (deploy_and_start_sensors,
stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout
+from .protocol import create_protocol, Timeout, CantUnpack
__all__ = ['Empty', 'recv_main',
@@ -32,9 +32,15 @@
def recv_main(proto, data_q, cmd_q):
while True:
try:
- data_q.put(proto.recv(0.1))
+ ip, packet = proto.recv(0.1)
+ if packet is not None:
+ data_q.put((ip, packet))
+ except AssertionError as exc:
+ logger.warning("Error in sensor data " + str(exc))
except Timeout:
pass
+ except CantUnpack as exc:
+ print exc
try:
val = cmd_q.get(False)
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index 9fd1a84..fad7e00 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -6,13 +6,15 @@
import cPickle as pickle
from urlparse import urlparse
-from . import cp_transport
-
class Timeout(Exception):
pass
+class CantUnpack(Exception):
+ pass
+
+
# ------------------------------------- Serializers --------------------------
@@ -24,87 +26,114 @@
pass
-class StructSerializer(ISensortResultsSerializer):
- class LocalConfig(object):
- def __init__(self):
- self.last_format_sent = -1
- self.initial_sent = False
- self.initial_times = 5
- self.field_order = None
+class StructSerializerSend(ISensortResultsSerializer):
+ initial_times = 5
+ resend_timeout = 60
+ HEADERS = 'h'
+ DATA = 'd'
+ END_OF_HEADERS = '\x00'
+ END_OF_SOURCE_ID = '\x00'
+ HEADERS_SEPARATOR = '\n'
def __init__(self):
- self.configs = {}
+ self.field_order = None
+ self.headers_send_cycles_left = self.initial_times
+ self.pack_fmt = None
+ self.next_header_send_time = None
def pack(self, data):
- OLD_FORMAT = 5
- source_id = data["source_id"]
- config = self.configs.setdefault(source_id,
- StructSerializer.LocalConfig())
+ data = data.copy()
- if config.field_order is None or \
- not config.initial_sent or \
- time.time() - config.last_format_sent > OLD_FORMAT:
- # send|resend format
- field_order = sorted(data.keys())
+ source_id = data.pop("source_id")
+ vals = [int(data.pop("time").value)]
- config.field_order = field_order
- config.last_format_sent = time.time()
- if not config.initial_sent:
- config.initial_times -= 1
- config.initial_sent = (config.initial_times <= 0)
+ if self.field_order is None:
+ self.field_order = sorted(data.keys())
+ self.pack_fmt = "!I" + "I" * len(self.field_order)
- forder = "\n".join(field_order)
- flen = struct.pack("!H", len(field_order))
- return "\x00{0}\x00{1}{2}".format(source_id, flen, forder)
+ need_resend = False
+ if self.next_header_send_time is not None:
+ if time.time() > self.next_header_send_time:
+ need_resend = True
+
+ if self.headers_send_cycles_left > 0 or need_resend:
+ forder = self.HEADERS_SEPARATOR.join(self.field_order)
+ flen = struct.pack("!H", len(self.field_order))
+
+ result = (self.HEADERS + source_id +
+ self.END_OF_SOURCE_ID +
+ flen + forder + self.END_OF_HEADERS)
+
+ if self.headers_send_cycles_left > 0:
+ self.headers_send_cycles_left -= 1
+
+ self.next_header_send_time = time.time() + self.resend_timeout
else:
- # send data
- # time will be first after source_id
- vals = [data["time"]]
- for name in config.field_order:
- if name in data:
- vals.append(data[name])
- pack_fmt = "!" + ("I" * len(vals))
- packed_data = struct.pack(pack_fmt, vals)
- return "\x01{0}\x00{1}".format(source_id, packed_data)
+ result = ""
+
+ for name in self.field_order:
+ vals.append(int(data[name].value))
+
+ packed_data = self.DATA + source_id
+ packed_data += self.END_OF_SOURCE_ID
+ packed_data += struct.pack(self.pack_fmt, *vals)
+
+ return result + packed_data
+
+
+class StructSerializerRecv(ISensortResultsSerializer):
+ def __init__(self):
+ self.fields = {}
+ self.formats = {}
def unpack(self, data):
code = data[0]
- data = data[1:]
- source_id, _, packed_data = data.partition("\x00")
- config = self.configs.setdefault(source_id,
- StructSerializer.LocalConfig())
- unpacked_data = {"source_id":source_id}
+ source_id, _, packed_data = data[1:].partition(
+ StructSerializerSend.END_OF_SOURCE_ID)
- if code == "\x00":
+ if code == StructSerializerSend.HEADERS:
# fields order provided
- flen = struct.unpack("!H", packed_data[:2])
- forder = packed_data[2:].split("\n")
- if len(forder) != flen:
- return unpacked_data
- config.field_order = forder
- return unpacked_data
+ flen_sz = struct.calcsize("!H")
+ flen = struct.unpack("!H", packed_data[:flen_sz])[0]
+ headers_data, rest = packed_data[flen_sz:].split(
+ StructSerializerSend.END_OF_HEADERS, 1)
+
+ forder = headers_data.split(
+ StructSerializerSend.HEADERS_SEPARATOR)
+
+ assert len(forder) == flen, \
+ "Wrong len {0} != {1}".format(len(forder), flen)
+
+ if 'source_id' in self.fields:
+ assert self.fields[source_id] == ['time'] + forder,\
+ "New field order"
+ else:
+ self.fields[source_id] = ['time'] + forder
+ self.formats[source_id] = "!I" + "I" * flen
+
+ if len(rest) != 0:
+ return self.unpack(rest)
+ return None
else:
- # data provided
- # try to find fields_order
- if config.field_order is None:
- raise ValueError("No fields order provided"
- " for {0}, cannot unpack".format(source_id))
+ assert code == StructSerializerSend.DATA,\
+ "Unknown code {0!r}".format(code)
- val_size = 4
- if len(packed_data) % val_size != 0:
- raise ValueError("Bad packet received"
- " from {0}, cannot unpack".format(source_id))
- datalen = len(packed_data) / val_size
- pack_fmt = "!" + ("I" * datalen)
- vals = struct.unpack(pack_fmt, packed_data)
+ try:
+ fields = self.fields[source_id]
+ except KeyError:
+ raise CantUnpack("No fields order provided"
+ " for {0} yet".format(source_id))
+ s_format = self.formats[source_id]
- unpacked_data['time'] = vals[0]
- i = 1
- for field in config.field_order:
- data[field] = vals[i]
- i += 1
- return data
+ exp_size = struct.calcsize(s_format)
+ assert len(packed_data) == exp_size, \
+ "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
+
+ vals = struct.unpack(s_format, packed_data)
+ res = dict(zip(fields, vals))
+ res['source_id'] = source_id
+ return res
class PickleSerializer(ISensortResultsSerializer):
@@ -215,8 +244,14 @@
return StdoutTransport(receiver)
elif parsed_uri.scheme == 'udp':
ip, port = parsed_uri.netloc.split(":")
+
+ if receiver:
+ packer_cls = StructSerializerRecv
+ else:
+ packer_cls = StructSerializerSend
+
return UDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=StructSerializer)
+ packer_cls=packer_cls)
elif parsed_uri.scheme == 'file':
return FileTransport(receiver, parsed_uri.path)
else:
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index ae0f3e4..81f348c 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -18,7 +18,7 @@
"placeholder for local node"
@classmethod
def open_sftp(cls):
- return cls
+ return cls()
@classmethod
def mkdir(cls, remotepath, mode=None):
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 259edf4..ed4e8c8 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -195,6 +195,9 @@
except OSError:
need_install.append(package)
+ if len(need_install) == 0:
+ return
+
cmd = "sudo apt-get -y install " + " ".join(need_install)
for i in range(max_retry):
diff --git a/wally/utils.py b/wally/utils.py
index e3fda71..ab825a6 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,5 +1,6 @@
import re
import os
+import socket
import logging
import threading
import contextlib
@@ -84,6 +85,12 @@
def get_ip_for_target(target_ip):
+ if not re.match("[0-9]+\.[0-9]+\.[0-9]+\.[0-9]$", target_ip):
+ target_ip = socket.gethostbyname(target_ip)
+
+ if target_ip in ('localhost', '127.0.0.1', '127.0.1.1'):
+ return '127.0.0.1'
+
cmd = 'ip route get to'.split(" ") + [target_ip]
data = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout.read()