large refactoring of new code
diff --git a/config.py b/config.py
index 6dd1a9b..a33f9b5 100644
--- a/config.py
+++ b/config.py
@@ -1,7 +1,7 @@
-import argparse
-import sys
-import yaml
import os
+# import sys
+import yaml
+# import argparse
def parse_config(file_name):
@@ -11,18 +11,19 @@
return cfg
-parser = argparse.ArgumentParser(description="config file name")
-parser.add_argument("-p", "--path")
-parser.add_argument("-b", "--basedir")
-parser.add_argument("-t", "--testpath")
-parser.add_argument("-d", "--database")
-parser.add_argument("-c", "--chartpath")
+# WTX???
+# parser = argparse.ArgumentParser(description="config file name")
+# parser.add_argument("-p", "--path")
+# parser.add_argument("-b", "--basedir")
+# parser.add_argument("-t", "--testpath")
+# parser.add_argument("-d", "--database")
+# parser.add_argument("-c", "--chartpath")
-config = parser.parse_args(sys.argv[1:])
+# config = parser.parse_args(sys.argv[1:])
path = "config.yaml"
-if not config.path is None:
- path = config.path
+# if config.path is not None:
+# path = config.path
cfg_dict = parse_config(os.path.join(os.path.dirname(__file__), path))
basedir = cfg_dict['paths']['basedir']
@@ -31,16 +32,14 @@
DATABASE_URI = cfg_dict['paths']['DATABASE_URI']
CHARTS_IMG_PATH = cfg_dict['paths']['CHARTS_IMG_PATH']
-if not config.basedir is None:
- basedir = config.basedir
+# if config.basedir is not None:
+# basedir = config.basedir
-if not config.testpath is None:
- TEST_PATH = config.testpath
+# if config.testpath is not None:
+# TEST_PATH = config.testpath
-if not config.database is None:
- DATABASE_URI = config.database
+# if config.database is not None:
+# DATABASE_URI = config.database
-if not config.chartpath is None:
- CHARTS_IMG_PATH = config.chartpath
-
-
+# if config.chartpath is not None:
+# CHARTS_IMG_PATH = config.chartpath
diff --git a/config.yaml b/config.yaml
index 3bab2f8..31b6410 100644
--- a/config.yaml
+++ b/config.yaml
@@ -3,20 +3,14 @@
openstack:
connection:
auth_url: http://172.16.54.130:5000/v2.0
- username: admin
- api_key: admin
- project_id: admin
+ creds: admin:admin@admin
discover:
vm:
name: test1
- auth:
- user: cirros
- password: cubswin:)
+ auth: cirros:cubswin:)
nodes:
service: all
- auth:
- user: cirros
- password: cubswin:)
+ auth: cirros:cubswin:)
start:
count: x1
img_name: TestVM
@@ -32,9 +26,7 @@
fuel:
connection:
url: http://172.16.52.120:8000/
- username: admin
- password: admin
- tenant_name: admin
+ creds: admin:admin@admin
discover: controller
other:
@@ -59,11 +51,11 @@
- mos-linux-http://172.12.33.45
paths:
- basedir: "/home/gstepanov/rally-results-processor"
- TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
- CHARTS_IMG_PATH: "static/images"
- SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
- DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
+ basedir: "/home/gstepanov/rally-results-processor"
+ TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
+ CHARTS_IMG_PATH: "static/images"
+ SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
+ DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
logging:
extra_logs: 1
\ No newline at end of file
diff --git a/data_processing.py b/data_processing.py
index 2905bbe..387ea3f 100644
--- a/data_processing.py
+++ b/data_processing.py
@@ -2,6 +2,8 @@
# to avoid circular imports.
import math
+# fix and update all this. Take statistic code from scripts/data2.py
+
class Measurement(object):
def __init__(self):
@@ -20,9 +22,7 @@
def mean(l):
- n = len(l)
-
- return sum(l) / n
+ return sum(l) / len(l)
def stdev(l):
diff --git a/nodes/sh.py b/ext_libs/sh.py
similarity index 100%
rename from nodes/sh.py
rename to ext_libs/sh.py
diff --git a/sensors/logger.py b/logger.py
similarity index 100%
rename from sensors/logger.py
rename to logger.py
diff --git a/make_report.py b/make_report.py
deleted file mode 100644
index e7bc946..0000000
--- a/make_report.py
+++ /dev/null
@@ -1,67 +0,0 @@
-import sys
-import json
-import argparse
-
-
-import gspread
-
-
-from config import DEFAULT_FILE_PATH, \
- WORK_SHEET, DOCUMENT_ID, OUTPUT_FILE
-from storage_api import DiskStorage, GoogleDocsStorage, \
- get_work_sheet, append_row
-
-
-def load_data(file_name):
- with open(file_name) as f:
- data = f.read()
- return json.loads(data)
-
-
-# getting worksheet from sheet or create it with specified column names.
-
-
-def make_report(email, password, data):
- gc = gspread.login(email, password)
- sh = gc.open_by_key(DOCUMENT_ID)
-
- work_sheet = get_work_sheet(sh, WORK_SHEET, data.keys())
- append_row(work_sheet, data)
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser()
- parser.add_argument('-n', '--name', help='data file path',
- default=DEFAULT_FILE_PATH)
- parser.add_argument('-e', '--email', help='user email',
- default="aaa@gmail.com")
- parser.add_argument('-p', '--password', help='user password',
- default="1234")
- parser.add_argument('-m', '--mode', help='mode type local or global',
- default='local')
- return parser.parse_args(argv)
-
-
-def process_results(file_name, email, password, mode):
- data = load_data(file_name)
-
- if mode == 'local':
- storage = DiskStorage(OUTPUT_FILE)
- else:
- storage = GoogleDocsStorage(DOCUMENT_ID, WORK_SHEET, email, password)
-
- storage.store(data)
-
-
-def main(argv):
- opts = parse_args(argv)
-
- process_results(opts.name,
- opts.email,
- opts.password,
- opts.mode)
- return 0
-
-
-if __name__ == '__main__':
- exit(main(sys.argv[1:]))
diff --git a/nodes/__init__.py b/nodes/__init__.py
index 8b13789..9198922 100644
--- a/nodes/__init__.py
+++ b/nodes/__init__.py
@@ -1 +1 @@
-
+"this package contains node discovery code"
diff --git a/nodes/ceph.py b/nodes/ceph.py
index c5a9288..216f30d 100644
--- a/nodes/ceph.py
+++ b/nodes/ceph.py
@@ -1,111 +1,71 @@
""" Collect data about ceph nodes"""
import json
-import sh
from node import Node
+from disk_perf_test_tool.ext_libs import sh
def discover_ceph_node():
""" Return list of ceph's nodes ips """
-
ips = {}
- osd_list = get_osds_list()
- osd_ips = get_osds_ips(osd_list)
+
+ osd_ips = get_osds_ips(get_osds_list())
mon_ips = get_mons_or_mds_ips("mon")
mds_ips = get_mons_or_mds_ips("mds")
+
for ip in osd_ips:
url = "ssh://%s" % ip
- if url in ips:
- ips[url].append("ceph-osd")
- else:
- ips[url] = ["ceph-osd"]
+ ips.setdefault(url, []).append("ceph-osd")
+
for ip in mon_ips:
url = "ssh://%s" % ip
- if url in ips:
- ips[url].append("ceph-mon")
- else:
- ips[url] = ["ceph-mon"]
+ ips.setdefault(url, []).append("ceph-mon")
+
for ip in mds_ips:
url = "ssh://%s" % ip
- if url in ips:
- ips[url].append("ceph-mds")
- else:
- ips[url] = ["ceph-mds"]
+ ips.setdefault(url, []).append("ceph-mds")
- res = []
- for url, roles in ips:
- res.append(Node(ip=url, roles=list(roles)))
-
- return res
-
-
-# internal services
-
-
-class CephException(Exception):
- """ Exceptions from ceph call"""
- pass
-
-class ParameterException(Exception):
- """ Bad parameter in function"""
- pass
+ return [Node(ip=url, roles=list(roles)) for url, roles in ips.items()]
def get_osds_list():
""" Get list of osds id"""
- try:
- res = sh.ceph.osd.ls()
- osd_list = [osd_id
- for osd_id in res.split("\n") if osd_id != '']
- return osd_list
- except sh.CommandNotFound:
- raise CephException("Ceph command not found")
- except:
- raise CephException("Ceph command 'osd ls' execution error")
+ return filter(None, sh.ceph.osd.ls().split("\n"))
def get_mons_or_mds_ips(who):
""" Return mon ip list
:param who - "mon" or "mds" """
- try:
- ips = set()
- if who == "mon":
- res = sh.ceph.mon.dump()
- elif who == "mds":
- res = sh.ceph.mds.dump()
- else:
- raise ParameterException("'%s' in get_mons_or_mds_ips instead of mon/mds" % who)
+ if who == "mon":
+ res = sh.ceph.mon.dump()
+ elif who == "mds":
+ res = sh.ceph.mds.dump()
+ else:
+ raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
+ "of mon/mds") % who)
- line_res = res.split("\n")
- for line in line_res:
- fields = line.split()
- if len(fields) > 2 and who in fields[2]:
- ips.add(fields[1].split(":")[0])
+ line_res = res.split("\n")
+ ips = set()
- return ips
+ for line in line_res:
+ fields = line.split()
- except sh.CommandNotFound:
- raise CephException("Ceph command not found")
- except ParameterException as e:
- raise e
- except:
- mes = "Ceph command '%s dump' execution error" % who
- raise CephException(mes)
+ # what does fields[1], fields[2] means?
+ # make this code looks like:
+ # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
+
+ if len(fields) > 2 and who in fields[2]:
+ ips.add(fields[1].split(":")[0])
+
+ return ips
def get_osds_ips(osd_list):
- """ Get osd's ips
+ """ Get osd's ips
:param osd_list - list of osd names from osd ls command"""
- try:
- ips = set()
- for osd_id in osd_list:
- res = sh.ceph.osd.find(osd_id)
- ip = json.loads(str(res))["ip"]
- ips.add(ip.split(":")[0])
- return ips
-
- except sh.CommandNotFound:
- raise CephException("Ceph command not found")
- except:
- raise CephException("Ceph command 'osd find' execution error")
-
+ ips = set()
+ for osd_id in osd_list:
+ res = sh.ceph.osd.find(osd_id)
+ ip = json.loads(str(res))["ip"]
+ ips.add(ip.split(":")[0])
+ return ips
diff --git a/nodes/discover.py b/nodes/discover.py
index 17aa1e7..d36e517 100644
--- a/nodes/discover.py
+++ b/nodes/discover.py
@@ -1,9 +1,10 @@
import logging
-import openstack
import ceph
import fuel
+import openstack
+from disk_perf_test_tool.utils import parse_creds
logger = logging.getLogger("io-perf-tool")
@@ -15,23 +16,37 @@
nodes_to_run = []
for cluster, cluster_info in cluster_conf.items():
if cluster == "openstack":
- conn = cluster_info.get('connection')
+
+ conn = cluster_info['connection']
+ user, passwd, tenant = parse_creds(conn['creds'])
+
+ auth_data = dict(
+ auth_url=conn['auth_url'],
+ username=user,
+ api_key=passwd,
+ project_id=tenant)
+
if not conn:
logger.error("No connection provided for %s. Skipping"
% cluster)
continue
+
logger.debug("Discovering openstack nodes "
"with connection details: %r" %
conn)
- nodes_to_run.extend(openstack.discover_openstack_nodes(
- conn, cluster_info))
+ os_nodes = openstack.discover_openstack_nodes(auth_data,
+ cluster_info)
+ nodes_to_run.extend(os_nodes)
+
if cluster == "fuel":
url = cluster_info['connection'].pop('url')
creads = cluster_info['connection']
roles = cluster_info['discover']
+
if isinstance(roles, basestring):
roles = [roles]
+
nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
if cluster == "ceph":
diff --git a/nodes/fuel.py b/nodes/fuel.py
index c828793..25476dc 100644
--- a/nodes/fuel.py
+++ b/nodes/fuel.py
@@ -1,6 +1,9 @@
+import logging
+
+
import node
import fuel_rest_api
-import logging
+from disk_perf_test_tool.utils import parse_creds
logger = logging.getLogger("io-perf-tool")
@@ -8,10 +11,18 @@
def discover_fuel_nodes(root_url, credentials, roles):
"""Discover Fuel nodes"""
- connection = fuel_rest_api.KeystoneAuth(root_url, credentials)
+ user, passwd, tenant = parse_creds(credentials['creds'])
+
+ creds = dict(
+ username=user,
+ password=passwd,
+ tenant_name=tenant,
+ )
+
+ connection = fuel_rest_api.KeystoneAuth(root_url, creds)
fi = fuel_rest_api.FuelInfo(connection)
nodes = []
for role in roles:
nodes.extend(getattr(fi.nodes, role))
logger.debug("Found %s fuel nodes" % len(fi.nodes))
- return [node.Node(n.ip, n.get_roles()) for n in nodes]
\ No newline at end of file
+ return [node.Node(n.ip, n.get_roles()) for n in nodes]
diff --git a/nodes/node.py b/nodes/node.py
index 03efd34..d7cae40 100644
--- a/nodes/node.py
+++ b/nodes/node.py
@@ -8,9 +8,14 @@
self.password = password
self.port = port
self.key_path = key_path
+ self.connection = None
+
+ def __str__(self):
+ return "<Node: url={0!r} roles={1} >".format(self.ip,
+ ", ".join(self.roles))
def __repr__(self):
- return "<Node: %s %s>" % (self.ip, self.roles)
+ return str(self)
def set_conn_attr(self, name, value):
setattr(self, name, value)
diff --git a/nodes/openstack.py b/nodes/openstack.py
index ed3d8ea..558985d 100644
--- a/nodes/openstack.py
+++ b/nodes/openstack.py
@@ -1,8 +1,10 @@
import logging
-import node
-from starts_vms import create_vms_mt
+
from novaclient.client import Client
+import node
+from disk_perf_test_tool.utils import parse_creds
+
logger = logging.getLogger("io-perf-tool")
@@ -16,10 +18,8 @@
def discover_vms(client, search_opts):
- auth = search_opts.pop('auth', {})
- user = auth.get('user')
- password = auth.get('password')
- key = auth.get('key_file')
+ user, password, key = parse_creds(search_opts.pop('auth'))
+
servers = client.servers.list(search_opts=search_opts)
logger.debug("Found %s openstack vms" % len(servers))
return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
@@ -28,10 +28,8 @@
def discover_services(client, opts):
- auth = opts.pop('auth', {})
- user = auth.get('user')
- password = auth.get('password')
- key = auth.get('key_file')
+ user, password, key = parse_creds(opts.pop('auth'))
+
services = []
if opts['service'] == "all":
services = client.services.list()
@@ -69,36 +67,34 @@
services_to_discover = conf['discover'].get('nodes')
if services_to_discover:
nodes.extend(discover_services(client, services_to_discover))
- if conf.get('start'):
- vms = start_test_vms(client, conf['start'])
- nodes.extend(vms)
return nodes
-def start_test_vms(client, opts):
+# from disk_perf_test_tool.starts_vms import create_vms_mt
+# def start_test_vms(client, opts):
- user = opts.pop("user", None)
- key_file = opts.pop("key_file", None)
- aff_group = opts.pop("aff_group", None)
- raw_count = opts.pop('count')
+# user = opts.pop("user", None)
+# key_file = opts.pop("key_file", None)
+# aff_group = opts.pop("aff_group", None)
+# raw_count = opts.pop('count')
- if raw_count.startswith("x"):
- logger.debug("Getting amount of compute services")
- count = len(client.services.list(binary="nova-compute"))
- count *= int(raw_count[1:])
- else:
- count = int(raw_count)
+# if raw_count.startswith("x"):
+# logger.debug("Getting amount of compute services")
+# count = len(client.services.list(binary="nova-compute"))
+# count *= int(raw_count[1:])
+# else:
+# count = int(raw_count)
- if aff_group is not None:
- scheduler_hints = {'group': aff_group}
- else:
- scheduler_hints = None
+# if aff_group is not None:
+# scheduler_hints = {'group': aff_group}
+# else:
+# scheduler_hints = None
- opts['scheduler_hints'] = scheduler_hints
+# opts['scheduler_hints'] = scheduler_hints
- logger.debug("Will start {0} vms".format(count))
+# logger.debug("Will start {0} vms".format(count))
- nodes = create_vms_mt(client, count, **opts)
- return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
- key_path=key_file) for server in nodes]
+# nodes = create_vms_mt(client, count, **opts)
+# return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
+# key_path=key_file) for server in nodes]
diff --git a/parse_config.py b/parse_config.py
deleted file mode 100644
index fd40910..0000000
--- a/parse_config.py
+++ /dev/null
@@ -1,4 +0,0 @@
-
-
-
-
diff --git a/run_test.py b/run_test.py
index cafbc59..ec77a41 100755
--- a/run_test.py
+++ b/run_test.py
@@ -6,15 +6,15 @@
import logging
import os.path
import argparse
-from nodes import discover
-import ssh_runner
+import ssh_utils
import io_scenario
+from nodes import discover
from config import cfg_dict
from utils import log_error
from rest_api import add_test
-from itest import IOPerfTest,PgBenchTest
from formatters import get_formatter
+from itest import IOPerfTest, PgBenchTest
logger = logging.getLogger("io-perf-tool")
logger.setLevel(logging.DEBUG)
@@ -58,49 +58,49 @@
parser = argparse.ArgumentParser(
description="Run disk io performance test")
- parser.add_argument("tool_type", help="test tool type",
- choices=['iozone', 'fio', 'pgbench', 'two_scripts'])
-
parser.add_argument("-l", dest='extra_logs',
action='store_true', default=False,
help="print some extra log info")
- parser.add_argument("-o", "--test-opts", dest='opts',
- help="cmd line options for test")
+ parser.add_argument('stages', nargs="+",
+ choices=["discover", "connect", "start_new_nodes",
+ "deploy_sensors"])
- parser.add_argument("-f", "--test-opts-file", dest='opts_file',
- type=argparse.FileType('r'), default=None,
- help="file with cmd line options for test")
+ # THIS ALL MOVE TO CONFIG FILE
+ # parser.add_argument("-o", "--test-opts", dest='opts',
+ # help="cmd line options for test")
- parser.add_argument("--max-preparation-time", default=300,
- type=int, dest="max_preparation_time")
+ # parser.add_argument("-f", "--test-opts-file", dest='opts_file',
+ # type=argparse.FileType('r'), default=None,
+ # help="file with cmd line options for test")
- parser.add_argument("-b", "--build-info", default=None,
- dest="build_name")
+ # parser.add_argument("--max-preparation-time", default=300,
+ # type=int, dest="max_preparation_time")
- parser.add_argument("-d", "--data-server-url", default=None,
- dest="data_server_url")
+ # parser.add_argument("-b", "--build-info", default=None,
+ # dest="build_name")
- parser.add_argument("-n", "--lab-name", default=None,
- dest="lab_name")
+ # parser.add_argument("-d", "--data-server-url", default=None,
+ # dest="data_server_url")
- parser.add_argument("--create-vms-opts", default=None,
- help="Creating vm's before run ssh runner",
- dest="create_vms_opts")
+ # parser.add_argument("-n", "--lab-name", default=None,
+ # dest="lab_name")
- parser.add_argument("-k", "--keep", default=False,
- help="keep temporary files",
- dest="keep_temp_files", action='store_true')
+ # parser.add_argument("--create-vms-opts", default=None,
+ # help="Creating vm's before run ssh runner",
+ # dest="create_vms_opts")
- choices = ["local", "ssh"]
+ # parser.add_argument("-k", "--keep", default=False,
+ # help="keep temporary files",
+ # dest="keep_temp_files", action='store_true')
- parser.add_argument("--runner", required=True,
- choices=choices, help="runner type")
+ # parser.add_argument("--runner", required=True,
+ # choices=["local", "ssh"], help="runner type")
- parser.add_argument("--runner-extra-opts", default=None,
- dest="runner_opts", help="runner extra options")
+ # parser.add_argument("--runner-extra-opts", default=None,
+ # dest="runner_opts", help="runner extra options")
- return parser.parse_args(argv)
+ return parser.parse_args(argv[1:])
def format_result(res, formatter):
@@ -111,10 +111,6 @@
return templ.format(data, formatter(res), "=" * 80)
-def deploy_and_start_sensors(sensors_conf, nodes):
- pass
-
-
def main(argv):
logging_conf = cfg_dict.get('logging')
if logging_conf:
@@ -122,13 +118,22 @@
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
+ opts = parse_args(argv)
+ if 'discover' in opts.stages:
+ current_data = discover.discover(cfg_dict.get('cluster'))
+
+ if 'connect' in opts.stages:
+ for node in current_data:
+ node.connection = ssh_utils.connect(node.connection_url)
+
+ print "\n".join(map(str, current_data))
+ return 0
# Discover nodes
- nodes_to_run = discover.discover(cfg_dict.get('cluster'))
tests = cfg_dict.get("tests", [])
# Deploy and start sensors
- deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
+ # deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
for test_name, opts in tests.items():
cmd_line = " ".join(opts['opts'])
@@ -144,13 +149,13 @@
opts.get('keep_temp_files'))
logger.debug(format_result(res, get_formatter(test_name)))
- if cfg_dict.get('data_server_url'):
- result = json.loads(get_formatter(opts.tool_type)(res))
- result['name'] = opts.build_name
- add_test(opts.build_name, result, opts.data_server_url)
+ # if cfg_dict.get('data_server_url'):
+ # result = json.loads(get_formatter(opts.tool_type)(res))
+ # result['name'] = opts.build_name
+ # add_test(opts.build_name, result, opts.data_server_url)
return 0
if __name__ == '__main__':
- exit(main(sys.argv[1:]))
+ exit(main(sys.argv))
diff --git a/run_tests.py b/run_tests.py
deleted file mode 100644
index 7ac225c..0000000
--- a/run_tests.py
+++ /dev/null
@@ -1,29 +0,0 @@
-from flask import Flask, render_template, request, url_for, request, redirect
-from flask.ext.sqlalchemy import SQLAlchemy
-import sqlite3
-import os
-
-app = Flask(__name__)
-sqlite3.connect(os.path.abspath("test.db"))
-app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///test.db"
-db = SQLAlchemy(app)
-
-
-class User(db.Model):
- id = db.Column(db.Integer, primary_key=True)
- username = db.Column(db.String(80), unique=True)
- email = db.Column(db.String(120), unique=True)
-
- def __init__(self, username, email):
- self.username = username
- self.email = email
- def __repr__(self):
- return "<User %r>" % self.username
-
-
-db.create_all()
-x = User("tes2t", "test2@gmail.com")
-db.session.add(x)
-db.session.commit()
-
-
diff --git a/sensors/config.yaml b/sensors/config.yaml
deleted file mode 100644
index d9ce8bb..0000000
--- a/sensors/config.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-192.168.0.104:
- block-io:
- allowed_prefixes: [sda1, rbd1]
- net-io:
- allowed_prefixes: [virbr2]
-
-# 192.168.152.39:
-# block-io:
-# allowed_prefixes: [sdb, sdc]
-# net-io:
-# allowed_prefixes: [eth0]
-
-# 192.168.152.40:
-# block-io:
-# allowed_prefixes: [sdb, sdc]
-# net-io:
-# allowed_prefixes: [eth0]
-
-# 192.168.152.41:
-# block-io:
-# allowed_prefixes: [sdb, sdc]
-# net-io:
-# allowed_prefixes: [eth0]
diff --git a/sensors/cp_protocol.py b/sensors/cp_protocol.py
index 79f94af..962bf0a 100644
--- a/sensors/cp_protocol.py
+++ b/sensors/cp_protocol.py
@@ -5,9 +5,15 @@
import zlib
import json
import binascii
-import logging
-from logger import define_logger
+try:
+ from disk_perf_test_tool.logger import define_logger
+ logger = define_logger(__name__)
+except ImportError:
+ class Logger(object):
+ def debug(self, *dt):
+ pass
+ logger = Logger()
# protocol contains 2 type of packet:
# 1 - header, which contains template schema of counters
@@ -52,7 +58,6 @@
self.tmpl_size = 0
self.packer = packer
-
def new_packet(self, part):
""" New packet adding """
# proceed packet
@@ -111,10 +116,8 @@
else:
return None
-
except PacketException as e:
# if something wrong - skip packet
- logger = logging.getLogger(__name__)
logger.warning("Packet skipped: %s", e)
self.is_begin = False
self.is_end = False
@@ -122,7 +125,6 @@
except TypeError:
# if something wrong - skip packet
- logger = logging.getLogger(__name__)
logger.warning("Packet skipped: doesn't match schema")
self.is_begin = False
self.is_end = False
@@ -130,13 +132,11 @@
except:
# if something at all wrong - skip packet
- logger = logging.getLogger(__name__)
logger.warning("Packet skipped: something is wrong")
self.is_begin = False
self.is_end = False
return None
-
@staticmethod
def create_packet(data, part_size):
""" Create packet divided by parts with part_size from data
@@ -160,7 +160,6 @@
return result
-
def create_packet_v2(self, data, part_size):
""" Create packet divided by parts with part_size from data
Compressed """
@@ -179,7 +178,6 @@
result.extend(parts)
return result
-
def get_matching_value_list(self, data):
""" Get values in order server expect"""
vals = range(0, self.tmpl_size)
@@ -188,7 +186,7 @@
for node, groups in self.clt_template.items():
for group, counters in groups.items():
for counter, index in counters.items():
- if not isinstance(index, dict):
+ if not isinstance(index, dict):
vals[index] = data[node][group][counter]
else:
for k, i in index.items():
@@ -201,8 +199,6 @@
logger.error("Data don't match last schema")
raise PacketException("Data don't match last schema")
-
-
def create_answer_template(self, perf_string):
""" Create template for server to insert counter values
Return tuple of server and clien templates + number of replaces"""
@@ -233,7 +229,3 @@
self.tmpl_size = k
self.clt_template = json.loads(clt_template)
-
-
-
-define_logger(__name__)
diff --git a/sensors/cp_transport.py b/sensors/cp_transport.py
index 1b951f2..2e00e80 100644
--- a/sensors/cp_transport.py
+++ b/sensors/cp_transport.py
@@ -5,7 +5,15 @@
import urlparse
from cp_protocol import Packet
-from logger import define_logger
+
+try:
+ from disk_perf_test_tool.logger import define_logger
+ logger = define_logger(__name__)
+except ImportError:
+ class Logger(object):
+ def debug(self, *dt):
+ pass
+ logger = Logger()
class SenderException(Exception):
@@ -62,14 +70,12 @@
self.all_data = {}
self.send_packer = None
-
def bind(self):
""" Prepare for listening """
self.sock.bind(self.bindto)
self.sock.settimeout(0.5)
self.binded = True
-
def send(self, data):
""" Send data to udp socket"""
if self.sock.sendto(data, self.sendto) != len(data):
@@ -77,7 +83,6 @@
logger.error(mes)
raise SenderException("Cannot send data")
-
def send_by_protocol(self, data):
""" Send data by Packet protocol
data = dict"""
@@ -87,7 +92,6 @@
for part in parts:
self.send(part)
-
def recv(self):
""" Receive data from udp socket"""
# check for binding
@@ -100,7 +104,6 @@
except socket.timeout:
raise Timeout()
-
def recv_by_protocol(self):
""" Receive data from udp socket by Packet protocol"""
data, remote_ip = self.recv()
@@ -110,7 +113,6 @@
return self.all_data[remote_ip].new_packet(data)
-
def recv_with_answer(self, stop_event=None):
""" Receive data from udp socket and send 'ok' back
Command port = local port + 1
@@ -132,7 +134,6 @@
# return None if we are interrupted
return None
-
def verified_send(self, send_host, message, max_repeat=20):
""" Send and verify it by answer not more then max_repeat
Send port = local port + 1
@@ -148,12 +149,9 @@
if remote_ip == send_host and data == "ok":
return True
else:
- logger.warning("No answer from %s, try %i", send_host, repeat)
+ logger.warning("No answer from %s, try %i",
+ send_host, repeat)
except Timeout:
logger.warning("No answer from %s, try %i", send_host, repeat)
return False
-
-
-
-logger = define_logger(__name__)
diff --git a/sensors/daemonize.py b/sensors/daemonize.py
index 1c3241b..a4fa157 100644
--- a/sensors/daemonize.py
+++ b/sensors/daemonize.py
@@ -20,17 +20,22 @@
- action: your custom function which will be executed after daemonization.
- keep_fds: optional list of fds which should not be closed.
- auto_close_fds: optional parameter to not close opened fds.
- - privileged_action: action that will be executed before drop privileges if user or
+ - privileged_action: action that will be executed before
+ drop privileges if user or
group parameter is provided.
- If you want to transfer anything from privileged_action to action, such as
- opened privileged file descriptor, you should return it from
- privileged_action function and catch it inside action function.
+ If you want to transfer anything from privileged
+ action to action, such as opened privileged file
+ descriptor, you should return it from
+ privileged_action function and catch it inside action
+ function.
- user: drop privileges to this user if provided.
- group: drop privileges to this group if provided.
- verbose: send debug messages to logger if provided.
- logger: use this logger object instead of creating new one, if provided.
"""
- def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True, privileged_action=None, user=None, group=None, verbose=False, logger=None):
+ def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
+ privileged_action=None, user=None, group=None, verbose=False,
+ logger=None):
self.app = app
self.pid = pid
self.action = action
@@ -62,19 +67,22 @@
""" start method
Main daemonization process.
"""
- # If pidfile already exists, we should read pid from there; to overwrite it, if locking
+ # If pidfile already exists, we should read pid from there;
+ # to overwrite it, if locking
# will fail, because locking attempt somehow purges the file contents.
if os.path.isfile(self.pid):
with open(self.pid, "r") as old_pidfile:
old_pid = old_pidfile.read()
- # Create a lockfile so that only one instance of this daemon is running at any time.
+ # Create a lockfile so that only one instance of this daemon is
+ # running at any time.
try:
lockfile = open(self.pid, "w")
except IOError:
print("Unable to create the pidfile.")
sys.exit(1)
try:
- # Try to get an exclusive lock on the file. This will fail if another process has the file
+ # Try to get an exclusive lock on the file. This will fail if
+ # another process has the file
# locked.
fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
@@ -97,7 +105,8 @@
# Stop listening for signals that the parent process receives.
# This is done by getting a new process id.
# setpgrp() is an alternative to setsid().
- # setsid puts the process in a new parent group and detaches its controlling terminal.
+ # setsid puts the process in a new parent group and detaches
+ # its controlling terminal.
process_id = os.setsid()
if process_id == -1:
# Uh oh, there was a problem.
@@ -106,10 +115,12 @@
# Add lockfile to self.keep_fds.
self.keep_fds.append(lockfile.fileno())
- # Close all file descriptors, except the ones mentioned in self.keep_fds.
+ # Close all file descriptors, except the ones mentioned in
+ # self.keep_fds.
devnull = "/dev/null"
if hasattr(os, "devnull"):
- # Python has set os.devnull on this system, use it instead as it might be different
+ # Python has set os.devnull on this system, use it instead as it
+ # might be different
# than /dev/null.
devnull = os.devnull
@@ -140,7 +151,8 @@
else:
syslog_address = "/dev/log"
- # We will continue with syslog initialization only if actually have such capabilities
+ # We will continue with syslog initialization only if
+ # actually have such capabilities
# on the machine we are running this.
if os.path.isfile(syslog_address):
syslog = handlers.SysLogHandler(syslog_address)
@@ -149,17 +161,20 @@
else:
syslog.setLevel(logging.INFO)
# Try to mimic to normal syslog messages.
- formatter = logging.Formatter("%(asctime)s %(name)s: %(message)s",
+ format_t = "%(asctime)s %(name)s: %(message)s"
+ formatter = logging.Formatter(format_t,
"%b %e %H:%M:%S")
syslog.setFormatter(formatter)
self.logger.addHandler(syslog)
- # Set umask to default to safe file permissions when running as a root daemon. 027 is an
+ # Set umask to default to safe file permissions when running
+ # as a root daemon. 027 is an
# octal number which we are typing as 0o27 for Python3 compatibility.
os.umask(0o27)
- # Change to a known directory. If this isn't done, starting a daemon in a subdirectory that
+ # Change to a known directory. If this isn't done, starting a daemon
+ # in a subdirectory that
# needs to be deleted results in "directory busy" errors.
os.chdir("/")
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index a1091c2..b2dc3f1 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -2,11 +2,10 @@
import json
import os.path
-from ssh_copy_directory import copy_paths
-from ssh_runner import connect
-
from concurrent.futures import ThreadPoolExecutor, wait
+from disk_perf_test_tool.ssh_utils import connect, copy_paths
+
def wait_all_ok(futures):
return all(future.result() for future in futures)
@@ -42,6 +41,7 @@
cmd = cmd_templ.format(main_remote_path,
monitor_uri,
config_remote_path)
+ print "Executing", cmd
conn.exec_command(cmd)
sftp.close()
conn.close()
diff --git a/sensors/main.py b/sensors/main.py
index 449acc1..fea46a3 100644
--- a/sensors/main.py
+++ b/sensors/main.py
@@ -2,25 +2,25 @@
import sys
import time
import json
+import glob
import signal
import os.path
import argparse
-# pylint: disable=W0611
-import io_sensors
-import net_sensors
-import pscpu_sensors
-import psram_sensors
-import syscpu_sensors
-import sysram_sensors
-# pylint: enable=W0611
-
-from utils import SensorInfo
+from sensors.utils import SensorInfo
from daemonize import Daemonize
from discover import all_sensors
from protocol import create_protocol
+# load all sensors
+import sensors
+sensors_dir = os.path.dirname(sensors.__file__)
+for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
+ mod_name = os.path.basename(fname[:-3])
+ __import__("sensors." + mod_name)
+
+
def get_values(required_sensors):
result = {}
for sensor_name, params in required_sensors:
@@ -68,7 +68,7 @@
opts = parse_args(argv)
if opts.list_sensors:
- print " ".join(all_sensors)
+ print "\n".join(sorted(all_sensors))
return 0
if opts.daemon is not None:
diff --git a/sensors/receiver.py b/sensors/receiver.py
index 8903f24..76f8bb7 100644
--- a/sensors/receiver.py
+++ b/sensors/receiver.py
@@ -1,41 +1,19 @@
-import yaml
-
from api import start_monitoring, Empty
-from influx_exporter import connect, add_data
-
-monitor_config = yaml.load(open("config.yaml").read())
+# from influx_exporter import connect, add_data
uri = "udp://192.168.0.104:12001"
-infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
-conn = connect(infldb_url)
+# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+# conn = connect(infldb_url)
-# sw_per_ip = {}
-# count = 4
-# expected = sorted(monitor_config.keys())
-
-# if 'sda1.sectors_written' in data:
-# val = data['sda1.sectors_written']
-# elif 'sdb.sectors_written' in data:
-# val = data['sdb.sectors_written']
-# else:
-# val = 0
-
-# sw_per_ip[ip] = sw_per_ip.get(ip, 0) + val
-# count -= 1
-
-# if 0 == count:
-# try:
-# vals = [sw_per_ip[ip] for ip in expected]
-# print ("{:>6}" * len(expected)).format(*vals)
-# sw_per_ip = {}
-# count = 4
-# except KeyError:
-# pass
+monitor_config = {'127.0.0.1':
+ {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
+ "net-io": {"allowed_prefixes": ["virbr2"]}}}
with start_monitoring(uri, monitor_config) as queue:
while True:
try:
(ip, port), data = queue.get(True, 1)
- add_data(conn, ip, [data])
+ print (ip, port), data
+ # add_data(conn, ip, [data])
except Empty:
pass
diff --git a/sensors/sensors/__init__.py b/sensors/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sensors/sensors/__init__.py
diff --git a/sensors/io_sensors.py b/sensors/sensors/io_sensors.py
similarity index 100%
rename from sensors/io_sensors.py
rename to sensors/sensors/io_sensors.py
diff --git a/sensors/net_sensors.py b/sensors/sensors/net_sensors.py
similarity index 100%
rename from sensors/net_sensors.py
rename to sensors/sensors/net_sensors.py
diff --git a/sensors/ps_mem.py b/sensors/sensors/ps_mem.py
similarity index 100%
rename from sensors/ps_mem.py
rename to sensors/sensors/ps_mem.py
diff --git a/sensors/pscpu_sensors.py b/sensors/sensors/pscpu_sensors.py
similarity index 99%
rename from sensors/pscpu_sensors.py
rename to sensors/sensors/pscpu_sensors.py
index f7c2d20..83b18c6 100644
--- a/sensors/pscpu_sensors.py
+++ b/sensors/sensors/pscpu_sensors.py
@@ -4,7 +4,6 @@
from utils import SensorInfo, get_pid_name, get_pid_list
-
@provides("perprocess-cpu")
def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
results = {}
diff --git a/sensors/psram_sensors.py b/sensors/sensors/psram_sensors.py
similarity index 100%
rename from sensors/psram_sensors.py
rename to sensors/sensors/psram_sensors.py
diff --git a/sensors/syscpu_sensors.py b/sensors/sensors/syscpu_sensors.py
similarity index 100%
rename from sensors/syscpu_sensors.py
rename to sensors/sensors/syscpu_sensors.py
diff --git a/sensors/sysram_sensors.py b/sensors/sensors/sysram_sensors.py
similarity index 100%
rename from sensors/sysram_sensors.py
rename to sensors/sensors/sysram_sensors.py
diff --git a/sensors/utils.py b/sensors/sensors/utils.py
similarity index 100%
rename from sensors/utils.py
rename to sensors/sensors/utils.py
diff --git a/sensors/storage/__init__.py b/sensors/storage/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sensors/storage/__init__.py
diff --git a/sensors/grafana.py b/sensors/storage/grafana.py
similarity index 100%
rename from sensors/grafana.py
rename to sensors/storage/grafana.py
diff --git a/sensors/grafana_template.js b/sensors/storage/grafana_template.js
similarity index 100%
rename from sensors/grafana_template.js
rename to sensors/storage/grafana_template.js
diff --git a/sensors/influx_exporter.py b/sensors/storage/influx_exporter.py
similarity index 100%
rename from sensors/influx_exporter.py
rename to sensors/storage/influx_exporter.py
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
deleted file mode 100644
index 265133c..0000000
--- a/ssh_copy_directory.py
+++ /dev/null
@@ -1,95 +0,0 @@
-import os.path
-
-
-def normalize_dirpath(dirpath):
- while dirpath.endswith("/"):
- dirpath = dirpath[:-1]
- return dirpath
-
-
-def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
- remotepath = normalize_dirpath(remotepath)
- if intermediate:
- try:
- sftp.mkdir(remotepath, mode=mode)
- except IOError:
- ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
- intermediate=True)
- return sftp.mkdir(remotepath, mode=mode)
- else:
- sftp.mkdir(remotepath, mode=mode)
-
-
-def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
- sftp.put(localfile, remfile)
- if preserve_perm:
- sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
-
-
-def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
- "upload local directory to remote recursively"
-
- # hack for localhost connection
- if hasattr(sftp, "copytree"):
- sftp.copytree(localpath, remotepath)
- return
-
- assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
-
- # normalize
- localpath = normalize_dirpath(localpath)
- remotepath = normalize_dirpath(remotepath)
-
- try:
- sftp.chdir(remotepath)
- localsuffix = localpath.rsplit("/", 1)[1]
- remotesuffix = remotepath.rsplit("/", 1)[1]
- if localsuffix != remotesuffix:
- remotepath = os.path.join(remotepath, localsuffix)
- except IOError:
- pass
-
- for root, dirs, fls in os.walk(localpath):
- prefix = os.path.commonprefix([localpath, root])
- suffix = root.split(prefix, 1)[1]
- if suffix.startswith("/"):
- suffix = suffix[1:]
-
- remroot = os.path.join(remotepath, suffix)
-
- try:
- sftp.chdir(remroot)
- except IOError:
- if preserve_perm:
- mode = os.stat(root).st_mode & 0777
- else:
- mode = 0777
- ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
- sftp.chdir(remroot)
-
- for f in fls:
- remfile = os.path.join(remroot, f)
- localfile = os.path.join(root, f)
- ssh_copy_file(sftp, localfile, remfile, preserve_perm)
-
-
-def copy_paths(conn, paths):
- sftp = conn.open_sftp()
- try:
- for src, dst in paths.items():
- try:
- if os.path.isfile(src):
- ssh_copy_file(sftp, src, dst)
- elif os.path.isdir(src):
- put_dir_recursively(sftp, src, dst)
- else:
- templ = "Can't copy {0!r} - " + \
- "it neither a file not a directory"
- msg = templ.format(src)
- raise OSError(msg)
- except Exception as exc:
- tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
- msg = tmpl.format(src, dst, exc)
- raise OSError(msg)
- finally:
- sftp.close()
diff --git a/ssh_runner.py b/ssh_runner.py
deleted file mode 100644
index 28996ab..0000000
--- a/ssh_runner.py
+++ /dev/null
@@ -1,140 +0,0 @@
-import re
-import Queue
-import logging
-import traceback
-import threading
-from concurrent.futures import ThreadPoolExecutor
-
-from utils import ssh_connect
-
-import itest
-from utils import get_barrier, log_error, wait_on_barrier
-
-
-logger = logging.getLogger("io-perf-tool")
-
-conn_uri_attrs = ("user", "passwd", "host", "port", "path")
-
-
-class ConnCreds(object):
- def __init__(self):
- for name in conn_uri_attrs:
- setattr(self, name, None)
-
-
-uri_reg_exprs = []
-
-
-class URIsNamespace(object):
- class ReParts(object):
- user_rr = "[^:]*?"
- host_rr = "[^:]*?"
- port_rr = "\\d+"
- key_file_rr = "[^:@]*"
- passwd_rr = ".*?"
-
- re_dct = ReParts.__dict__
-
- for attr_name, val in re_dct.items():
- if attr_name.endswith('_rr'):
- new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
- setattr(ReParts, attr_name, new_rr)
-
- re_dct = ReParts.__dict__
-
- templs = [
- "^{host_rr}$",
- "^{user_rr}@{host_rr}::{key_file_rr}$",
- "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
- "^{user_rr}:{passwd_rr}@@{host_rr}$",
- "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
- ]
-
- for templ in templs:
- uri_reg_exprs.append(templ.format(**re_dct))
-
-
-def parse_ssh_uri(uri):
- # user:passwd@@ip_host:port
- # user:passwd@@ip_host
- # user@ip_host:port
- # user@ip_host
- # ip_host:port
- # ip_host
- # user@ip_host:port:path_to_key_file
- # user@ip_host::path_to_key_file
- # ip_host:port:path_to_key_file
- # ip_host::path_to_key_file
-
- res = ConnCreds()
- res.port = "22"
- res.key_file = None
- res.passwd = None
-
- for rr in uri_reg_exprs:
- rrm = re.match(rr, uri)
- if rrm is not None:
- res.__dict__.update(rrm.groupdict())
- return res
- raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-
-
-def connect(uri):
- creds = parse_ssh_uri(uri)
- creds.port = int(creds.port)
- return ssh_connect(creds)
-
-
-def conn_func(obj, barrier, latest_start_time, conn):
- try:
- test_iter = itest.run_test_iter(obj, conn)
- next(test_iter)
-
- wait_on_barrier(barrier, latest_start_time)
-
- with log_error("!Run test"):
- return next(test_iter)
- except:
- print traceback.format_exc()
- raise
-
-
-def get_ssh_runner(uris,
- latest_start_time=None,
- keep_temp_files=False):
- logger.debug("Connecting to servers")
-
- with ThreadPoolExecutor(max_workers=16) as executor:
- connections = list(executor.map(connect, uris))
-
- result_queue = Queue.Queue()
- barrier = get_barrier(len(uris), threaded=True)
-
- def closure(obj):
- ths = []
- obj.set_result_cb(result_queue.put)
-
- params = (obj, barrier, latest_start_time)
-
- logger.debug("Start tests")
- for conn in connections:
- th = threading.Thread(None, conn_func, None,
- params + (conn,))
- th.daemon = True
- th.start()
- ths.append(th)
-
- for th in ths:
- th.join()
-
- test_result = []
- while not result_queue.empty():
- test_result.append(result_queue.get())
-
- logger.debug("Done. Closing connection")
- for conn in connections:
- conn.close()
-
- return test_result
-
- return closure
diff --git a/ssh_utils.py b/ssh_utils.py
new file mode 100644
index 0000000..a7dda3f
--- /dev/null
+++ b/ssh_utils.py
@@ -0,0 +1,233 @@
+import re
+import Queue
+import logging
+import os.path
+import traceback
+import threading
+
+from concurrent.futures import ThreadPoolExecutor
+
+import itest
+from utils import ssh_connect
+from utils import get_barrier, log_error, wait_on_barrier
+
+logger = logging.getLogger("io-perf-tool")
+conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+
+def normalize_dirpath(dirpath):
+ while dirpath.endswith("/"):
+ dirpath = dirpath[:-1]
+ return dirpath
+
+
+def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
+ remotepath = normalize_dirpath(remotepath)
+ if intermediate:
+ try:
+ sftp.mkdir(remotepath, mode=mode)
+ except IOError:
+ ssh_mkdir(sftp, remotepath.rsplit("/", 1)[0], mode=mode,
+ intermediate=True)
+ return sftp.mkdir(remotepath, mode=mode)
+ else:
+ sftp.mkdir(remotepath, mode=mode)
+
+
+def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
+ sftp.put(localfile, remfile)
+ if preserve_perm:
+ sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
+
+
+def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
+ "upload local directory to remote recursively"
+
+ # hack for localhost connection
+ if hasattr(sftp, "copytree"):
+ sftp.copytree(localpath, remotepath)
+ return
+
+ assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
+
+ # normalize
+ localpath = normalize_dirpath(localpath)
+ remotepath = normalize_dirpath(remotepath)
+
+ try:
+ sftp.chdir(remotepath)
+ localsuffix = localpath.rsplit("/", 1)[1]
+ remotesuffix = remotepath.rsplit("/", 1)[1]
+ if localsuffix != remotesuffix:
+ remotepath = os.path.join(remotepath, localsuffix)
+ except IOError:
+ pass
+
+ for root, dirs, fls in os.walk(localpath):
+ prefix = os.path.commonprefix([localpath, root])
+ suffix = root.split(prefix, 1)[1]
+ if suffix.startswith("/"):
+ suffix = suffix[1:]
+
+ remroot = os.path.join(remotepath, suffix)
+
+ try:
+ sftp.chdir(remroot)
+ except IOError:
+ if preserve_perm:
+ mode = os.stat(root).st_mode & 0777
+ else:
+ mode = 0777
+ ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
+ sftp.chdir(remroot)
+
+ for f in fls:
+ remfile = os.path.join(remroot, f)
+ localfile = os.path.join(root, f)
+ ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def copy_paths(conn, paths):
+ sftp = conn.open_sftp()
+ try:
+ for src, dst in paths.items():
+ try:
+ if os.path.isfile(src):
+ ssh_copy_file(sftp, src, dst)
+ elif os.path.isdir(src):
+ put_dir_recursively(sftp, src, dst)
+ else:
+ templ = "Can't copy {0!r} - " + \
+ "it neither a file not a directory"
+ msg = templ.format(src)
+ raise OSError(msg)
+ except Exception as exc:
+ tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+ msg = tmpl.format(src, dst, exc)
+ raise OSError(msg)
+ finally:
+ sftp.close()
+
+
+class ConnCreds(object):
+ def __init__(self):
+ for name in conn_uri_attrs:
+ setattr(self, name, None)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+ class ReParts(object):
+ user_rr = "[^:]*?"
+ host_rr = "[^:]*?"
+ port_rr = "\\d+"
+ key_file_rr = "[^:@]*"
+ passwd_rr = ".*?"
+
+ re_dct = ReParts.__dict__
+
+ for attr_name, val in re_dct.items():
+ if attr_name.endswith('_rr'):
+ new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+ setattr(ReParts, attr_name, new_rr)
+
+ re_dct = ReParts.__dict__
+
+ templs = [
+ "^{host_rr}$",
+ "^{user_rr}@{host_rr}::{key_file_rr}$",
+ "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}$",
+ "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
+ ]
+
+ for templ in templs:
+ uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+ # user:passwd@@ip_host:port
+ # user:passwd@@ip_host
+ # user@ip_host:port
+ # user@ip_host
+ # ip_host:port
+ # ip_host
+ # user@ip_host:port:path_to_key_file
+ # user@ip_host::path_to_key_file
+ # ip_host:port:path_to_key_file
+ # ip_host::path_to_key_file
+
+ res = ConnCreds()
+ res.port = "22"
+ res.key_file = None
+ res.passwd = None
+
+ for rr in uri_reg_exprs:
+ rrm = re.match(rr, uri)
+ if rrm is not None:
+ res.__dict__.update(rrm.groupdict())
+ return res
+ raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def connect(uri):
+ creds = parse_ssh_uri(uri)
+ creds.port = int(creds.port)
+ return ssh_connect(creds)
+
+
+def conn_func(obj, barrier, latest_start_time, conn):
+ try:
+ test_iter = itest.run_test_iter(obj, conn)
+ next(test_iter)
+
+ wait_on_barrier(barrier, latest_start_time)
+
+ with log_error("!Run test"):
+ return next(test_iter)
+ except:
+ print traceback.format_exc()
+ raise
+
+
+def get_ssh_runner(uris,
+ latest_start_time=None,
+ keep_temp_files=False):
+ logger.debug("Connecting to servers")
+
+ with ThreadPoolExecutor(max_workers=16) as executor:
+ connections = list(executor.map(connect, uris))
+
+ result_queue = Queue.Queue()
+ barrier = get_barrier(len(uris), threaded=True)
+
+ def closure(obj):
+ ths = []
+ obj.set_result_cb(result_queue.put)
+
+ params = (obj, barrier, latest_start_time)
+
+ logger.debug("Start tests")
+ for conn in connections:
+ th = threading.Thread(None, conn_func, None,
+ params + (conn,))
+ th.daemon = True
+ th.start()
+ ths.append(th)
+
+ for th in ths:
+ th.join()
+
+ test_result = []
+ while not result_queue.empty():
+ test_result.append(result_queue.get())
+
+ logger.debug("Done. Closing connection")
+ for conn in connections:
+ conn.close()
+
+ return test_result
+
+ return closure
diff --git a/test_results/storage.json b/test_results/storage.json
deleted file mode 100644
index 083d7ff..0000000
--- a/test_results/storage.json
+++ /dev/null
@@ -1,95 +0,0 @@
-[
- {
- "randwrite a 256k": [16885, 1869],
- "randwrite s 4k": [79, 2],
- "read a 64k": [74398, 11618],
- "write s 1024k": [7490, 193],
- "randwrite a 64k": [14167, 4665],
- "build_id": "1",
- "randread a 1024k": [68683, 8604],
- "randwrite s 256k": [3277, 146],
- "write a 1024k": [24069, 660],
- "type": "GA",
- "write a 64k": [24555, 1006],
- "write s 64k": [1285, 57],
- "write a 256k": [24928, 503],
- "write s 256k": [4029, 192],
- "randwrite a 1024k": [23980, 1897],
- "randread a 64k": [27257, 17268],
- "randwrite s 1024k": [8504, 238],
- "randread a 256k": [60868, 2637],
- "randread a 4k": [3612, 1355],
- "read a 1024k": [71122, 9217],
- "date": "Thu Feb 12 19:11:56 2015",
- "write s 4k": [87, 3],
- "read a 4k": [88367, 6471],
- "read a 256k": [80904, 8930],
- "name": "GA - 6.0 GA",
- "randwrite s 1k": [20, 0],
- "randwrite s 64k": [1029, 34],
- "write s 1k": [21, 0],
- "iso_md5": "bla bla"
- },
- {
- "randwrite a 256k": [20212, 5690],
- "randwrite s 4k": [83, 6],
- "read a 64k": [89394, 3912],
- "write s 1024k": [8054, 280],
- "randwrite a 64k": [14595, 3245],
- "build_id": "2",
- "randread a 1024k": [83277, 9310],
- "randwrite s 256k": [3628, 433],
- "write a 1024k": [29226, 8624],
- "type": "master",
- "write a 64k": [25089, 790],
- "write s 64k": [1236, 30],
- "write a 256k": [30327, 9799],
- "write s 256k": [4049, 172],
- "randwrite a 1024k": [29000, 9302],
- "randread a 64k": [26775, 16319],
- "randwrite s 1024k": [8665, 1457],
- "randread a 256k": [63608, 16126],
- "randread a 4k": [3212, 1620],
- "read a 1024k": [89676, 4401],
- "date": "Thu Feb 12 19:11:56 2015",
- "write s 4k": [88, 3],
- "read a 4k": [92263, 5186],
- "read a 256k": [94505, 6868],
- "name": "6.1 Dev",
- "randwrite s 1k": [22, 3],
- "randwrite s 64k": [1105, 46],
- "write s 1k": [22, 0],
- "iso_md5": "bla bla"
- },
- {
- "randwrite a 256k": [47237, 16107],
- "randwrite s 4k": [62, 23],
- "read a 64k": [94564, 36030],
- "write s 1024k": [8904, 2935],
- "randwrite a 64k": [27769, 6609],
- "build_id": "3",
- "randread a 1024k": [31379, 25667],
- "randwrite s 256k": [3009, 1028],
- "write a 1024k": [84995, 34596],
- "type": "Dev",
- "write a 64k": [71598, 22032],
- "write s 64k": [624, 164],
- "write a 256k": [75071, 21284],
- "write s 256k": [2724, 938],
- "randwrite a 1024k": [79885, 14716],
- "randread a 64k": [6953, 3886],
- "randwrite s 1024k": [10161, 2086],
- "randread a 256k": [23681, 11116],
- "randread a 4k": [829, 10],
- "read a 1024k": [64053, 37245],
- "date": "Thu Feb 12 19:11:56 2015",
- "write s 4k": [44, 16],
- "read a 4k": [52864, 35589],
- "read a 256k": [75291, 32068],
- "name": "Some dev",
- "randwrite s 1k": [12, 3],
- "randwrite s 64k": [1211, 266],
- "write s 1k": [13, 5],
- "iso_md5": "blsgl"
- }
-]
\ No newline at end of file
diff --git a/utils.py b/utils.py
index 5b9c0a5..8b6e1b0 100644
--- a/utils.py
+++ b/utils.py
@@ -13,6 +13,18 @@
logger = logging.getLogger("io-perf-tool")
+def parse_creds(creds):
+ # parse user:passwd@host
+ user, passwd_host = creds.split(":", 1)
+
+ if '@' not in passwd_host:
+ passwd, host = passwd_host, None
+ else:
+ passwd, host = passwd_host.rsplit('@', 1)
+
+ return user, passwd, host
+
+
def get_barrier(count, threaded=False):
if threaded:
class val(object):