discovering
diff --git a/config.py b/config.py
index 6dd1a9b..d17c63a 100644
--- a/config.py
+++ b/config.py
@@ -1,5 +1,3 @@
-import argparse
-import sys
import yaml
import os
diff --git a/config.yaml b/config.yaml
index 1d67d3a..3bab2f8 100644
--- a/config.yaml
+++ b/config.yaml
@@ -1,15 +1,43 @@
# nodes to be started/detected
cluster:
- - start_vm_OS:
- # запустить по одной vm на каждую компьюту в опенстеке
- creds: openrc-perf1
- count: x1
- image: ubuntu-test
- flavor: ceph-test-io
+ openstack:
+ connection:
+ auth_url: http://172.16.54.130:5000/v2.0
+ username: admin
+ api_key: admin
+ project_id: admin
+ discover:
+ vm:
+ name: test1
+ auth:
+ user: cirros
+ password: cubswin:)
+ nodes:
+ service: all
+ auth:
+ user: cirros
+ password: cubswin:)
+ start:
+ count: x1
+ img_name: TestVM
+ flavor_name: m1.micro
+ keypair_name: test
+ network_zone_name: net04
+ flt_ip_pool: net04_ext
+ key_file: path/to/
+ user: username
- - find_ceph_nodes
- - find_openstack_nodes
- - add:
+ ceph: all
+
+ fuel:
+ connection:
+ url: http://172.16.52.120:8000/
+ username: admin
+ password: admin
+ tenant_name: admin
+ discover: controller
+
+ other:
- ssh://10.33.14.67, iscsi
# sensors to be installed, accordingli to role
@@ -17,16 +45,14 @@
ceph-osd: ceph-io, ceph-cpu, ceph-ram, ceph-net
ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
os-compute: io, net
- test-vm: io, net
+ test-vm: system-cpu
# tests to run
tests:
- - io:
- tool: iozone
- opts_file: io_opts_file.txt
- - pgbench
- - rados-bench:
- sizes: 4k, 16k, 1m
+ pgbench:
+ opts:
+ num_clients: [4, 8, 12]
+ transactions: [1, 2, 3]
# where to store results
results:
@@ -37,4 +63,7 @@
TEST_PATH: "/home/gstepanov/rally-results-processor/test_results"
CHARTS_IMG_PATH: "static/images"
SQLALCHEMY_MIGRATE_REPO: "/home/gstepanov/rally-results-processor/db_repository"
- DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
\ No newline at end of file
+ DATABASE_URI: 'sqlite:////home/gstepanov/rally-results-processor/app.db?check_same_thread=False'
+
+logging:
+ extra_logs: 1
\ No newline at end of file
diff --git a/fuel_rest_api.py b/fuel_rest_api.py
index 507779b..12b53bb 100644
--- a/fuel_rest_api.py
+++ b/fuel_rest_api.py
@@ -269,13 +269,16 @@
node_info = self.get_info()
return node_info.get('network_data')
- def get_roles(self):
+ def get_roles(self, pending=False):
"""Get node roles
Returns: (roles, pending_roles)
"""
node_info = self.get_info()
- return node_info.get('roles'), node_info.get('pending_roles')
+ if pending:
+ return node_info.get('roles'), node_info.get('pending_roles')
+ else:
+ return node_info.get('roles')
def get_ip(self, network='public'):
"""Get node ip
diff --git a/nodes/ceph.py b/nodes/ceph.py
index 871b834..c5a9288 100644
--- a/nodes/ceph.py
+++ b/nodes/ceph.py
@@ -16,21 +16,21 @@
for ip in osd_ips:
url = "ssh://%s" % ip
if url in ips:
- ips[url].add("ceph-osd")
+ ips[url].append("ceph-osd")
else:
- ips[url] = ("ceph-osd")
+ ips[url] = ["ceph-osd"]
for ip in mon_ips:
url = "ssh://%s" % ip
if url in ips:
- ips[url].add("ceph-mon")
+ ips[url].append("ceph-mon")
else:
- ips[url] = ("ceph-mon")
+ ips[url] = ["ceph-mon"]
for ip in mds_ips:
url = "ssh://%s" % ip
if url in ips:
- ips[url].add("ceph-mds")
+ ips[url].append("ceph-mds")
else:
- ips[url] = ("ceph-mds")
+ ips[url] = ["ceph-mds"]
res = []
for url, roles in ips:
diff --git a/nodes/discover.py b/nodes/discover.py
new file mode 100644
index 0000000..17aa1e7
--- /dev/null
+++ b/nodes/discover.py
@@ -0,0 +1,39 @@
+import logging
+
+import openstack
+import ceph
+import fuel
+
+
+logger = logging.getLogger("io-perf-tool")
+
+
+def discover(cluster_conf):
+ if not cluster_conf:
+ logger.error("No nodes configured")
+
+ nodes_to_run = []
+ for cluster, cluster_info in cluster_conf.items():
+ if cluster == "openstack":
+ conn = cluster_info.get('connection')
+ if not conn:
+ logger.error("No connection provided for %s. Skipping"
+ % cluster)
+ continue
+ logger.debug("Discovering openstack nodes "
+ "with connection details: %r" %
+ conn)
+
+ nodes_to_run.extend(openstack.discover_openstack_nodes(
+ conn, cluster_info))
+ if cluster == "fuel":
+ url = cluster_info['connection'].pop('url')
+ creads = cluster_info['connection']
+ roles = cluster_info['discover']
+ if isinstance(roles, basestring):
+ roles = [roles]
+ nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
+
+ if cluster == "ceph":
+ nodes_to_run.extend(ceph.discover_ceph_node())
+ return nodes_to_run
diff --git a/nodes/fuel.py b/nodes/fuel.py
index 76dedcd..c828793 100644
--- a/nodes/fuel.py
+++ b/nodes/fuel.py
@@ -1,10 +1,17 @@
import node
import fuel_rest_api
+import logging
-def discover_fuel_nodes(root_url, credentials):
+logger = logging.getLogger("io-perf-tool")
+
+
+def discover_fuel_nodes(root_url, credentials, roles):
"""Discover Fuel nodes"""
connection = fuel_rest_api.KeystoneAuth(root_url, credentials)
fi = fuel_rest_api.FuelInfo(connection)
-
- return [node.Node(n.ip, n.get_roles()) for n in fi.nodes]
\ No newline at end of file
+ nodes = []
+ for role in roles:
+ nodes.extend(getattr(fi.nodes, role))
+ logger.debug("Found %s fuel nodes" % len(fi.nodes))
+ return [node.Node(n.ip, n.get_roles()) for n in nodes]
\ No newline at end of file
diff --git a/nodes/node.py b/nodes/node.py
index 4ee2109..03efd34 100644
--- a/nodes/node.py
+++ b/nodes/node.py
@@ -9,11 +9,14 @@
self.port = port
self.key_path = key_path
+ def __repr__(self):
+ return "<Node: %s %s>" % (self.ip, self.roles)
+
def set_conn_attr(self, name, value):
setattr(self, name, value)
@property
- def connection(self):
+ def connection_url(self):
connection = []
if self.username:
diff --git a/nodes/openstack.py b/nodes/openstack.py
index 7444e46..ed3d8ea 100644
--- a/nodes/openstack.py
+++ b/nodes/openstack.py
@@ -1,33 +1,104 @@
+import logging
import node
-
+from starts_vms import create_vms_mt
from novaclient.client import Client
+logger = logging.getLogger("io-perf-tool")
+
+
def get_floating_ip(vm):
addrs = vm.addresses
for net_name, ifaces in addrs.items():
for iface in ifaces:
if iface.get('OS-EXT-IPS:type') == "floating":
return iface['addr']
- raise Exception("No floating ip found for VM %s" % repr(vm))
-def discover_openstack_vms(conn_details):
+def discover_vms(client, search_opts):
+ auth = search_opts.pop('auth', {})
+ user = auth.get('user')
+ password = auth.get('password')
+ key = auth.get('key_file')
+ servers = client.servers.list(search_opts=search_opts)
+ logger.debug("Found %s openstack vms" % len(servers))
+ return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
+ password=password, key_path=key)
+ for server in servers if get_floating_ip(server)]
+
+
+def discover_services(client, opts):
+ auth = opts.pop('auth', {})
+ user = auth.get('user')
+ password = auth.get('password')
+ key = auth.get('key_file')
+ services = []
+ if opts['service'] == "all":
+ services = client.services.list()
+ else:
+ if isinstance(opts['service'], basestring):
+ opts['service'] = [opts['service']]
+
+ for s in opts['service']:
+ services.extend(client.services.list(binary=s))
+
+ host_services_mapping = {}
+ for service in services:
+ if host_services_mapping.get(service.host):
+ host_services_mapping[service.host].append(service.binary)
+ else:
+ host_services_mapping[service.host] = [service.binary]
+ logger.debug("Found %s openstack service nodes" %
+ len(host_services_mapping))
+ return [node.Node(host, services, username=user,
+ password=password, key_path=key) for host, services in
+ host_services_mapping.items()]
+
+
+def discover_openstack_nodes(conn_details, conf):
"""Discover vms running in openstack
:param conn_details - dict with openstack connection details -
auth_url, api_key (password), username
"""
client = Client(version='1.1', **conn_details)
- servers = client.servers.list(search_opts={"all_tenant": True})
- return [node.Node(get_floating_ip(server), ["test_vm"])
- for server in servers]
+ nodes = []
+ if conf.get('discover'):
+ vms_to_discover = conf['discover'].get('vm')
+ if vms_to_discover:
+ nodes.extend(discover_vms(client, vms_to_discover))
+ services_to_discover = conf['discover'].get('nodes')
+ if services_to_discover:
+ nodes.extend(discover_services(client, services_to_discover))
+ if conf.get('start'):
+ vms = start_test_vms(client, conf['start'])
+ nodes.extend(vms)
+
+ return nodes
-def discover_openstack_nodes(conn_details):
- """Discover openstack nodes
- :param conn_details - dict with openstack connection details -
- auth_url, api_key (password), username
- """
- client = Client(version='1.1', **conn_details)
- services = client.services.list()
- return [node.Node(server.ip, ["test_vm"]) for server in services]
+def start_test_vms(client, opts):
+
+ user = opts.pop("user", None)
+ key_file = opts.pop("key_file", None)
+ aff_group = opts.pop("aff_group", None)
+ raw_count = opts.pop('count')
+
+ if raw_count.startswith("x"):
+ logger.debug("Getting amount of compute services")
+ count = len(client.services.list(binary="nova-compute"))
+ count *= int(raw_count[1:])
+ else:
+ count = int(raw_count)
+
+ if aff_group is not None:
+ scheduler_hints = {'group': aff_group}
+ else:
+ scheduler_hints = None
+
+ opts['scheduler_hints'] = scheduler_hints
+
+ logger.debug("Will start {0} vms".format(count))
+
+ nodes = create_vms_mt(client, count, **opts)
+ return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
+ key_path=key_file) for server in nodes]
diff --git a/run_test.py b/run_test.py
old mode 100644
new mode 100755
index f1800fd..cafbc59
--- a/run_test.py
+++ b/run_test.py
@@ -2,26 +2,20 @@
import sys
import json
import time
-import shutil
import pprint
-import weakref
import logging
import os.path
import argparse
-import traceback
-import subprocess
-import contextlib
-
+from nodes import discover
import ssh_runner
import io_scenario
+from config import cfg_dict
from utils import log_error
from rest_api import add_test
-from itest import IOPerfTest, run_test_iter, PgBenchTest
-from starts_vms import nova_connect, create_vms_mt, clear_all
+from itest import IOPerfTest,PgBenchTest
from formatters import get_formatter
-
logger = logging.getLogger("io-perf-tool")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
@@ -60,73 +54,6 @@
return test_runner(obj)
-class FileWrapper(object):
- def __init__(self, fd, conn):
- self.fd = fd
- self.channel_wr = weakref.ref(conn)
-
- def read(self):
- return self.fd.read()
-
- @property
- def channel(self):
- return self.channel_wr()
-
-
-class LocalConnection(object):
- def __init__(self):
- self.proc = None
-
- def exec_command(self, cmd):
- PIPE = subprocess.PIPE
- self.proc = subprocess.Popen(cmd,
- shell=True,
- stdout=PIPE,
- stderr=PIPE,
- stdin=PIPE)
- res = (self.proc.stdin,
- FileWrapper(self.proc.stdout, self),
- self.proc.stderr)
- return res
-
- def recv_exit_status(self):
- return self.proc.wait()
-
- def open_sftp(self):
- return self
-
- def close(self):
- pass
-
- def put(self, localfile, remfile):
- return shutil.copy(localfile, remfile)
-
- def mkdir(self, remotepath, mode):
- os.mkdir(remotepath)
- os.chmod(remotepath, mode)
-
- def chmod(self, remotepath, mode):
- os.chmod(remotepath, mode)
-
- def copytree(self, src, dst):
- shutil.copytree(src, dst)
-
-
-def get_local_runner(clear_tmp_files=True):
- def closure(obj):
- res = []
- obj.set_result_cb(res.append)
- test_iter = run_test_iter(obj,
- LocalConnection())
- next(test_iter)
-
- with log_error("!Run test"):
- next(test_iter)
- return res
-
- return closure
-
-
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run disk io performance test")
@@ -176,41 +103,6 @@
return parser.parse_args(argv)
-def get_opts(opts_file, test_opts):
- if opts_file is not None and test_opts is not None:
- print "Options --opts-file and --opts can't be " + \
- "provided same time"
- exit(1)
-
- if opts_file is None and test_opts is None:
- print "Either --opts-file or --opts should " + \
- "be provided"
- exit(1)
-
- if opts_file is not None:
- opts = []
-
- opt_lines = opts_file.readlines()
- opt_lines = [i for i in opt_lines if i != "" and not i.startswith("#")]
-
- for opt_line in opt_lines:
- if opt_line.strip() != "":
- opts.append([opt.strip()
- for opt in opt_line.strip().split(" ")
- if opt.strip() != ""])
- else:
- opts = [[opt.strip()
- for opt in test_opts.split(" ")
- if opt.strip() != ""]]
-
- if len(opts) == 0:
- print "Can't found parameters for tests. Check" + \
- "--opts-file or --opts options"
- exit(1)
-
- return opts
-
-
def format_result(res, formatter):
data = "\n{0}\n".format("=" * 80)
data += pprint.pformat(res) + "\n"
@@ -219,117 +111,40 @@
return templ.format(data, formatter(res), "=" * 80)
-@contextlib.contextmanager
-def start_test_vms(opts):
- create_vms_opts = {}
- for opt in opts.split(","):
- name, val = opt.split("=", 1)
- create_vms_opts[name] = val
-
- user = create_vms_opts.pop("user")
- key_file = create_vms_opts.pop("key_file")
- aff_group = create_vms_opts.pop("aff_group", None)
- raw_count = create_vms_opts.pop("count", "x1")
-
- logger.debug("Connection to nova")
- nova = nova_connect()
-
- if raw_count.startswith("x"):
- logger.debug("Getting amount of compute services")
- count = len(nova.services.list(binary="nova-compute"))
- count *= int(raw_count[1:])
- else:
- count = int(raw_count)
-
- if aff_group is not None:
- scheduler_hints = {'group': aff_group}
- else:
- scheduler_hints = None
-
- create_vms_opts['scheduler_hints'] = scheduler_hints
-
- logger.debug("Will start {0} vms".format(count))
-
- try:
- ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
-
- uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
-
- yield uris
- except:
- traceback.print_exc()
- finally:
- logger.debug("Clearing")
- clear_all(nova)
+def deploy_and_start_sensors(sensors_conf, nodes):
+ pass
def main(argv):
- opts = parse_args(argv)
+ logging_conf = cfg_dict.get('logging')
+ if logging_conf:
+ if logging_conf.get('extra_logs'):
+ logger.setLevel(logging.DEBUG)
+ ch.setLevel(logging.DEBUG)
- if opts.extra_logs:
- logger.setLevel(logging.DEBUG)
- ch.setLevel(logging.DEBUG)
+ # Discover nodes
+ nodes_to_run = discover.discover(cfg_dict.get('cluster'))
- test_opts = get_opts(opts.opts_file, opts.opts)
+ tests = cfg_dict.get("tests", [])
- if opts.runner == "local":
- logger.debug("Run on local computer")
- try:
- for script_args in test_opts:
- cmd_line = " ".join(script_args)
- logger.debug("Run test with {0!r} params".format(cmd_line))
- runner = get_local_runner(opts.keep_temp_files)
- res = run_io_test(opts.tool_type,
- script_args,
- runner,
- opts.keep_temp_files)
- logger.debug(format_result(res, get_formatter(opts.tool_type)))
- except:
- traceback.print_exc()
- return 1
+ # Deploy and start sensors
+ deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
- elif opts.runner == "ssh":
- logger.debug("Use ssh runner")
+ for test_name, opts in tests.items():
+ cmd_line = " ".join(opts['opts'])
+ logger.debug("Run test with {0!r} params".format(cmd_line))
+ latest_start_time = 300 + time.time()
+ uris = [node.connection_url for node in nodes_to_run]
+ runner = ssh_runner.get_ssh_runner(uris,
+ latest_start_time,
+ opts.get('keep_temp_files'))
+ res = run_io_test(test_name,
+ opts['opts'],
+ runner,
+ opts.get('keep_temp_files'))
+ logger.debug(format_result(res, get_formatter(test_name)))
- uris = []
-
- if opts.create_vms_opts is not None:
- vm_context = start_test_vms(opts.create_vms_opts)
- uris += vm_context.__enter__()
- else:
- vm_context = None
-
- if opts.runner_opts is not None:
- uris += opts.runner_opts.split(";")
-
- if len(uris) == 0:
- logger.critical("You need to provide at least" +
- " vm spawn params or ssh params")
- return 1
-
- try:
- for script_args in test_opts:
- cmd_line = " ".join(script_args)
- logger.debug("Run test with {0!r} params".format(cmd_line))
- latest_start_time = opts.max_preparation_time + time.time()
- runner = ssh_runner.get_ssh_runner(uris,
- latest_start_time,
- opts.keep_temp_files)
- res = run_io_test(opts.tool_type,
- script_args,
- runner,
- opts.keep_temp_files)
- logger.debug(format_result(res, get_formatter(opts.tool_type)))
-
- except:
- traceback.print_exc()
- return 1
- finally:
- if vm_context is not None:
- vm_context.__exit__(None, None, None)
- logger.debug("Clearing")
-
- if opts.data_server_url:
+ if cfg_dict.get('data_server_url'):
result = json.loads(get_formatter(opts.tool_type)(res))
result['name'] = opts.build_name
add_test(opts.build_name, result, opts.data_server_url)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index f3bb2ba..a1091c2 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -2,8 +2,8 @@
import json
import os.path
-from disk_perf_test_tool.ssh_copy_directory import copy_paths
-from disk_perf_test_tool.ssh_runner import connect
+from ssh_copy_directory import copy_paths
+from ssh_runner import connect
from concurrent.futures import ThreadPoolExecutor, wait
diff --git a/starts_vms.py b/starts_vms.py
index a4720cf..62bff8c 100644
--- a/starts_vms.py
+++ b/starts_vms.py
@@ -169,13 +169,11 @@
if flt_ip is Allocate:
flt_ip = nova.floating_ips.create(pool)
-
if flt_ip is not None:
# print "attaching ip to server"
srv.add_floating_ip(flt_ip)
- return (flt_ip.ip, srv)
- else:
- return (None, srv)
+
+ return nova.servers.get(srv.id)
def clear_all(nova, name_templ="ceph-test-{0}"):