very large refactoring
diff --git a/FIXME b/FIXME
new file mode 100644
index 0000000..7e21a50
--- /dev/null
+++ b/FIXME
@@ -0,0 +1,11 @@
+assumption_check.py
+ почти все криво
+
+charts.py
+ 1) генерировать картинки с фиксированными именами
+
+report.py
+ украсить
+
+rest_api.py
+ переписать на prest
diff --git a/__init__.py b/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/__init__.py
+++ /dev/null
diff --git a/chart/__init__.py b/chart/__init__.py
deleted file mode 100644
index cd49ed9..0000000
--- a/chart/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-import sys
-
-from GChartWrapper import constants
-
-# Patch MARKER constant
-constants.MARKERS += 'E'
-
-sys.modules['GChartWrapper.GChart'].MARKERS += 'E'
diff --git a/config.py b/config.py
deleted file mode 100644
index 8f656ec..0000000
--- a/config.py
+++ /dev/null
@@ -1,45 +0,0 @@
-import os
-import yaml
-
-from petname import Generate as pet_generate
-
-
-cfg_dict = {}
-
-
-def load_config(file_name, explicit_folder=None):
- global cfg_dict
- first_load = len(cfg_dict) == 0
- cfg_dict.update(yaml.load(open(file_name).read()))
-
- if first_load:
- var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
-
- if explicit_folder is None:
- while True:
- dr = os.path.join(var_dir, pet_generate(2, "_"))
- if not os.path.exists(dr):
- break
- else:
- dr = explicit_folder
-
- cfg_dict['var_dir'] = dr
- if not os.path.exists(cfg_dict['var_dir']):
- os.makedirs(cfg_dict['var_dir'])
-
- def in_var_dir(fname):
- return os.path.join(cfg_dict['var_dir'], fname)
-
- charts_img_path = in_var_dir('charts')
- cfg_dict['charts_img_path'] = charts_img_path
- if not os.path.exists(charts_img_path):
- os.makedirs(charts_img_path)
-
- cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
- cfg_dict['html_report_file'] = in_var_dir('report.html')
- cfg_dict['text_report_file'] = in_var_dir('report.txt')
- cfg_dict['log_file'] = in_var_dir('log.txt')
- cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
- cfg_dict['test_log_directory'] = in_var_dir('test_logs')
- if not os.path.exists(cfg_dict['test_log_directory']):
- os.makedirs(cfg_dict['test_log_directory'])
diff --git a/ext_libs/sh.py b/ext_libs/sh.py
deleted file mode 100644
index fb0f5fe..0000000
--- a/ext_libs/sh.py
+++ /dev/null
@@ -1,2352 +0,0 @@
-"""
-http://amoffat.github.io/sh/
-"""
-#===============================================================================
-# Copyright (C) 2011-2015 by Andrew Moffat
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-#===============================================================================
-
-
-__version__ = "1.11"
-__project_url__ = "https://github.com/amoffat/sh"
-
-
-
-import platform
-
-if "windows" in platform.system().lower():
- raise ImportError("sh %s is currently only supported on linux and osx. \
-please install pbs 0.110 (http://pypi.python.org/pypi/pbs) for windows \
-support." % __version__)
-
-
-import sys
-IS_PY3 = sys.version_info[0] == 3
-
-import traceback
-import os
-import re
-from glob import glob as original_glob
-import time
-from types import ModuleType
-from functools import partial
-import inspect
-from contextlib import contextmanager
-
-from locale import getpreferredencoding
-DEFAULT_ENCODING = getpreferredencoding() or "UTF-8"
-
-
-if IS_PY3:
- from io import StringIO
- from io import BytesIO as cStringIO
- from queue import Queue, Empty
-
- # for some reason, python 3.1 removed the builtin "callable", wtf
- if not hasattr(__builtins__, "callable"):
- def callable(ob):
- return hasattr(ob, "__call__")
-else:
- from StringIO import StringIO
- from cStringIO import OutputType as cStringIO
- from Queue import Queue, Empty
-
-IS_OSX = platform.system() == "Darwin"
-THIS_DIR = os.path.dirname(os.path.realpath(__file__))
-SH_LOGGER_NAME = "sh"
-
-
-import errno
-import warnings
-
-import pty
-import termios
-import signal
-import gc
-import select
-import threading
-import tty
-import fcntl
-import struct
-import resource
-from collections import deque
-import logging
-import weakref
-
-
-# TODO remove with contexts in next version
-def with_context_warning():
- warnings.warn("""
-with contexts are deprecated because they are not thread safe. they will be \
-removed in the next version. use subcommands instead \
-http://amoffat.github.io/sh/#sub-commands. see \
-https://github.com/amoffat/sh/issues/195
-""".strip(), stacklevel=3)
-
-
-
-if IS_PY3:
- raw_input = input
- unicode = str
- basestring = str
-
-
-_unicode_methods = set(dir(unicode()))
-
-
-def encode_to_py3bytes_or_py2str(s):
- """ takes anything and attempts to return a py2 string or py3 bytes. this
- is typically used when creating command + arguments to be executed via
- os.exec* """
-
- fallback_encoding = "utf8"
-
- if IS_PY3:
- # if we're already bytes, do nothing
- if isinstance(s, bytes):
- pass
- else:
- s = str(s)
- try:
- s = bytes(s, DEFAULT_ENCODING)
- except UnicodeEncodeError:
- s = bytes(s, fallback_encoding)
- else:
- # attempt to convert the thing to unicode from the system's encoding
- try:
- s = unicode(s, DEFAULT_ENCODING)
- # if the thing is already unicode, or it's a number, it can't be
- # coerced to unicode with an encoding argument, but if we leave out
- # the encoding argument, it will convert it to a string, then to unicode
- except TypeError:
- s = unicode(s)
-
- # now that we have guaranteed unicode, encode to our system encoding,
- # but attempt to fall back to something
- try:
- s = s.encode(DEFAULT_ENCODING)
- except:
- s = s.encode(fallback_encoding)
- return s
-
-
-class ErrorReturnCode(Exception):
- """ base class for all exceptions as a result of a command's exit status
- being deemed an error. this base class is dynamically subclassed into
- derived classes with the format: ErrorReturnCode_NNN where NNN is the exit
- code number. the reason for this is it reduces boiler plate code when
- testing error return codes:
-
- try:
- some_cmd()
- except ErrorReturnCode_12:
- print("couldn't do X")
-
- vs:
- try:
- some_cmd()
- except ErrorReturnCode as e:
- if e.exit_code == 12:
- print("couldn't do X")
-
- it's not much of a savings, but i believe it makes the code easier to read """
-
- truncate_cap = 750
-
- def __init__(self, full_cmd, stdout, stderr):
- self.full_cmd = full_cmd
- self.stdout = stdout
- self.stderr = stderr
-
-
- if self.stdout is None:
- exc_stdout = "<redirected>"
- else:
- exc_stdout = self.stdout[:self.truncate_cap]
- out_delta = len(self.stdout) - len(exc_stdout)
- if out_delta:
- exc_stdout += ("... (%d more, please see e.stdout)" % out_delta).encode()
-
- if self.stderr is None:
- exc_stderr = "<redirected>"
- else:
- exc_stderr = self.stderr[:self.truncate_cap]
- err_delta = len(self.stderr) - len(exc_stderr)
- if err_delta:
- exc_stderr += ("... (%d more, please see e.stderr)" % err_delta).encode()
-
- msg = "\n\n RAN: %r\n\n STDOUT:\n%s\n\n STDERR:\n%s" % \
- (full_cmd, exc_stdout.decode(DEFAULT_ENCODING, "replace"),
- exc_stderr.decode(DEFAULT_ENCODING, "replace"))
- super(ErrorReturnCode, self).__init__(msg)
-
-
-class SignalException(ErrorReturnCode): pass
-class TimeoutException(Exception):
- """ the exception thrown when a command is killed because a specified
- timeout (via _timeout) was hit """
- def __init__(self, exit_code):
- self.exit_code = exit_code
- super(Exception, self).__init__()
-
-SIGNALS_THAT_SHOULD_THROW_EXCEPTION = (
- signal.SIGABRT,
- signal.SIGBUS,
- signal.SIGFPE,
- signal.SIGILL,
- signal.SIGINT,
- signal.SIGKILL,
- signal.SIGPIPE,
- signal.SIGQUIT,
- signal.SIGSEGV,
- signal.SIGTERM,
- signal.SIGSYS,
-)
-
-
-# we subclass AttributeError because:
-# https://github.com/ipython/ipython/issues/2577
-# https://github.com/amoffat/sh/issues/97#issuecomment-10610629
-class CommandNotFound(AttributeError): pass
-
-
-
-
-rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_((\d+)|SIG\w+)")
-rc_exc_cache = {}
-
-
-def get_exc_from_name(name):
- """ takes an exception name, like:
-
- ErrorReturnCode_1
- SignalException_9
- SignalException_SIGHUP
-
- and returns the corresponding exception. this is primarily used for
- importing exceptions from sh into user code, for instance, to capture those
- exceptions """
-
- exc = None
- try:
- return rc_exc_cache[name]
- except KeyError:
- m = rc_exc_regex.match(name)
- if m:
- base = m.group(1)
- rc_or_sig_name = m.group(2)
-
- if base == "SignalException":
- try:
- rc = -int(rc_or_sig_name)
- except ValueError:
- rc = -getattr(signal, rc_or_sig_name)
- else:
- rc = int(rc_or_sig_name)
-
- exc = get_rc_exc(rc)
- return exc
-
-
-def get_rc_exc(rc_or_sig_name):
- """ takes a exit code, signal number, or signal name, and produces an
- exception that corresponds to that return code. positive return codes yield
- ErrorReturnCode exception, negative return codes yield SignalException
-
- we also cache the generated exception so that only one signal of that type
- exists, preserving identity """
-
- try:
- rc = int(rc_or_sig_name)
- except ValueError:
- rc = -getattr(signal, rc_or_sig_name)
-
- try:
- return rc_exc_cache[rc]
- except KeyError:
- pass
-
- if rc > 0:
- name = "ErrorReturnCode_%d" % rc
- base = ErrorReturnCode
- else:
- name = "SignalException_%d" % abs(rc)
- base = SignalException
-
- exc = type(name, (base,), {"exit_code": rc})
- rc_exc_cache[rc] = exc
- return exc
-
-
-
-
-def which(program):
- def is_exe(fpath):
- return (os.path.exists(fpath) and
- os.access(fpath, os.X_OK) and
- os.path.isfile(os.path.realpath(fpath)))
-
- fpath, fname = os.path.split(program)
- if fpath:
- if is_exe(program):
- return program
- else:
- if "PATH" not in os.environ:
- return None
- for path in os.environ["PATH"].split(os.pathsep):
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
-
- return None
-
-def resolve_program(program):
- path = which(program)
- if not path:
- # our actual command might have a dash in it, but we can't call
- # that from python (we have to use underscores), so we'll check
- # if a dash version of our underscore command exists and use that
- # if it does
- if "_" in program:
- path = which(program.replace("_", "-"))
- if not path:
- return None
- return path
-
-
-# we add this thin wrapper to glob.glob because of a specific edge case where
-# glob does not expand to anything. for example, if you try to do
-# glob.glob("*.py") and there are no *.py files in the directory, glob.glob
-# returns an empty list. this empty list gets passed to the command, and
-# then the command fails with a misleading error message. this thin wrapper
-# ensures that if there is no expansion, we pass in the original argument,
-# so that when the command fails, the error message is clearer
-def glob(arg):
- return original_glob(arg) or arg
-
-
-
-class Logger(object):
- """ provides a memory-inexpensive logger. a gotcha about python's builtin
- logger is that logger objects are never garbage collected. if you create a
- thousand loggers with unique names, they'll sit there in memory until your
- script is done. with sh, it's easy to create loggers with unique names if
- we want our loggers to include our command arguments. for example, these
- are all unique loggers:
-
- ls -l
- ls -l /tmp
- ls /tmp
-
- so instead of creating unique loggers, and without sacrificing logging
- output, we use this class, which maintains as part of its state, the logging
- "context", which will be the very unique name. this allows us to get a
- logger with a very general name, eg: "command", and have a unique name
- appended to it via the context, eg: "ls -l /tmp" """
- def __init__(self, name, context=None):
- self.name = name
- if context:
- context = context.replace("%", "%%")
- self.context = context
- self.log = logging.getLogger("%s.%s" % (SH_LOGGER_NAME, name))
-
- def _format_msg(self, msg, *args):
- if self.context:
- msg = "%s: %s" % (self.context, msg)
- return msg % args
-
- def get_child(self, name, context):
- new_name = self.name + "." + name
- new_context = self.context + "." + context
- l = Logger(new_name, new_context)
- return l
-
- def info(self, msg, *args):
- self.log.info(self._format_msg(msg, *args))
-
- def debug(self, msg, *args):
- self.log.debug(self._format_msg(msg, *args))
-
- def error(self, msg, *args):
- self.log.error(self._format_msg(msg, *args))
-
- def exception(self, msg, *args):
- self.log.exception(self._format_msg(msg, *args))
-
-
-def friendly_truncate(s, max_len):
- if len(s) > max_len:
- s = "%s...(%d more)" % (s[:max_len], len(s) - max_len)
- return s
-
-
-class RunningCommand(object):
- """ this represents an executing Command object. it is returned as the
- result of __call__() being executed on a Command instance. this creates a
- reference to a OProc instance, which is a low-level wrapper around the
- process that was exec'd
-
- this is the class that gets manipulated the most by user code, and so it
- implements various convenience methods and logical mechanisms for the
- underlying process. for example, if a user tries to access a
- backgrounded-process's stdout/err, the RunningCommand object is smart enough
- to know to wait() on the process to finish first. and when the process
- finishes, RunningCommand is smart enough to translate exit codes to
- exceptions. """
-
- def __init__(self, cmd, call_args, stdin, stdout, stderr):
- # self.ran is used for auditing what actually ran. for example, in
- # exceptions, or if you just want to know what was ran after the
- # command ran
- if IS_PY3:
- self.ran = " ".join([arg.decode(DEFAULT_ENCODING, "ignore") for arg in cmd])
- else:
- self.ran = " ".join(cmd)
-
-
- friendly_cmd = friendly_truncate(self.ran, 20)
- friendly_call_args = friendly_truncate(str(call_args), 20)
-
- # we're setting up the logger string here, instead of __repr__ because
- # we reserve __repr__ to behave as if it was evaluating the child
- # process's output
- logger_str = "<Command %r call_args %s>" % (friendly_cmd,
- friendly_call_args)
-
- self.log = Logger("command", logger_str)
- self.call_args = call_args
- self.cmd = cmd
-
- self.process = None
- self._process_completed = False
- should_wait = True
- spawn_process = True
-
-
- # with contexts shouldn't run at all yet, they prepend
- # to every command in the context
- if call_args["with"]:
- spawn_process = False
- Command._prepend_stack.append(self)
-
-
- if call_args["piped"] or call_args["iter"] or call_args["iter_noblock"]:
- should_wait = False
-
- # we're running in the background, return self and let us lazily
- # evaluate
- if call_args["bg"]:
- should_wait = False
-
- # redirection
- if call_args["err_to_out"]:
- stderr = OProc.STDOUT
-
-
- # set up which stream should write to the pipe
- # TODO, make pipe None by default and limit the size of the Queue
- # in oproc.OProc
- pipe = OProc.STDOUT
- if call_args["iter"] == "out" or call_args["iter"] is True:
- pipe = OProc.STDOUT
- elif call_args["iter"] == "err":
- pipe = OProc.STDERR
-
- if call_args["iter_noblock"] == "out" or call_args["iter_noblock"] is True:
- pipe = OProc.STDOUT
- elif call_args["iter_noblock"] == "err":
- pipe = OProc.STDERR
-
-
- # there's currently only one case where we wouldn't spawn a child
- # process, and that's if we're using a with-context with our command
- if spawn_process:
- self.log.info("starting process")
- self.process = OProc(self.log, cmd, stdin, stdout, stderr,
- self.call_args, pipe)
-
- if should_wait:
- self.wait()
-
-
- def wait(self):
- if not self._process_completed:
- self._process_completed = True
-
- exit_code = self.process.wait()
- if self.process.timed_out:
- # if we timed out, our exit code represents a signal, which is
- # negative, so let's make it positive to store in our
- # TimeoutException
- raise TimeoutException(-exit_code)
- else:
- self.handle_command_exit_code(exit_code)
-
- # https://github.com/amoffat/sh/issues/185
- if self.call_args["done"]:
- self.call_args["done"](self)
-
- return self
-
-
- def handle_command_exit_code(self, code):
- """ here we determine if we had an exception, or an error code that we
- weren't expecting to see. if we did, we create and raise an exception
- """
- if (code not in self.call_args["ok_code"] and (code > 0 or -code in
- SIGNALS_THAT_SHOULD_THROW_EXCEPTION)):
- exc = get_rc_exc(code)
- raise exc(self.ran, self.process.stdout, self.process.stderr)
-
-
-
- @property
- def stdout(self):
- self.wait()
- return self.process.stdout
-
- @property
- def stderr(self):
- self.wait()
- return self.process.stderr
-
- @property
- def exit_code(self):
- self.wait()
- return self.process.exit_code
-
- @property
- def pid(self):
- return self.process.pid
-
- def __len__(self):
- return len(str(self))
-
- def __enter__(self):
- """ we don't actually do anything here because anything that should have
- been done would have been done in the Command.__call__ call.
- essentially all that has to happen is the comand be pushed on the
- prepend stack. """
- with_context_warning()
-
- def __iter__(self):
- return self
-
- def next(self):
- """ allow us to iterate over the output of our command """
-
- # we do this because if get blocks, we can't catch a KeyboardInterrupt
- # so the slight timeout allows for that.
- while True:
- try:
- chunk = self.process._pipe_queue.get(True, 0.001)
- except Empty:
- if self.call_args["iter_noblock"]:
- return errno.EWOULDBLOCK
- else:
- if chunk is None:
- self.wait()
- raise StopIteration()
- try:
- return chunk.decode(self.call_args["encoding"],
- self.call_args["decode_errors"])
- except UnicodeDecodeError:
- return chunk
-
- # python 3
- __next__ = next
-
- def __exit__(self, typ, value, traceback):
- if self.call_args["with"] and Command._prepend_stack:
- Command._prepend_stack.pop()
-
- def __str__(self):
- """ in python3, should return unicode. in python2, should return a
- string of bytes """
- if IS_PY3:
- return self.__unicode__()
- else:
- return unicode(self).encode(self.call_args["encoding"])
-
- def __unicode__(self):
- """ a magic method defined for python2. calling unicode() on a
- RunningCommand object will call this """
- if self.process and self.stdout:
- return self.stdout.decode(self.call_args["encoding"],
- self.call_args["decode_errors"])
- elif IS_PY3:
- return ""
- else:
- return unicode("")
-
- def __eq__(self, other):
- return unicode(self) == unicode(other)
- __hash__ = None # Avoid DeprecationWarning in Python < 3
-
- def __contains__(self, item):
- return item in str(self)
-
- def __getattr__(self, p):
- # let these three attributes pass through to the OProc object
- if p in ("signal", "terminate", "kill"):
- if self.process:
- return getattr(self.process, p)
- else:
- raise AttributeError
-
- # see if strings have what we're looking for. we're looking at the
- # method names explicitly because we don't want to evaluate self unless
- # we absolutely have to, the reason being, in python2, hasattr swallows
- # exceptions, and if we try to run hasattr on a command that failed and
- # is being run with _iter=True, the command will be evaluated, throw an
- # exception, but hasattr will discard it
- if p in _unicode_methods:
- return getattr(unicode(self), p)
-
- raise AttributeError
-
- def __repr__(self):
- """ in python3, should return unicode. in python2, should return a
- string of bytes """
- try:
- return str(self)
- except UnicodeDecodeError:
- if self.process:
- if self.stdout:
- return repr(self.stdout)
- return repr("")
-
- def __long__(self):
- return long(str(self).strip())
-
- def __float__(self):
- return float(str(self).strip())
-
- def __int__(self):
- return int(str(self).strip())
-
-
-
-def output_redirect_is_filename(out):
- return out \
- and not callable(out) \
- and not hasattr(out, "write") \
- and not isinstance(out, (cStringIO, StringIO))
-
-
-
-
-
-
-class Command(object):
- """ represents an un-run system program, like "ls" or "cd". because it
- represents the program itself (and not a running instance of it), it should
- hold very little state. in fact, the only state it does hold is baked
- arguments.
-
- when a Command object is called, the result that is returned is a
- RunningCommand object, which represents the Command put into an execution
- state. """
- _prepend_stack = []
-
- _call_args = {
- # currently unsupported
- #"fg": False, # run command in foreground
-
- # run a command in the background. commands run in the background
- # ignore SIGHUP and do not automatically exit when the parent process
- # ends
- "bg": False,
-
- "with": False, # prepend the command to every command after it
- "in": None,
- "out": None, # redirect STDOUT
- "err": None, # redirect STDERR
- "err_to_out": None, # redirect STDERR to STDOUT
-
- # stdin buffer size
- # 1 for line, 0 for unbuffered, any other number for that amount
- "in_bufsize": 0,
- # stdout buffer size, same values as above
- "out_bufsize": 1,
- "err_bufsize": 1,
-
- # this is how big the output buffers will be for stdout and stderr.
- # this is essentially how much output they will store from the process.
- # we use a deque, so if it overflows past this amount, the first items
- # get pushed off as each new item gets added.
- #
- # NOTICE
- # this is not a *BYTE* size, this is a *CHUNK* size...meaning, that if
- # you're buffering out/err at 1024 bytes, the internal buffer size will
- # be "internal_bufsize" CHUNKS of 1024 bytes
- "internal_bufsize": 3 * 1024 ** 2,
-
- "env": None,
- "piped": None,
- "iter": None,
- "iter_noblock": None,
- "ok_code": 0,
- "cwd": None,
-
- # the separator delimiting between a long-argument's name and its value
- # for example, --arg=derp, '=' is the long_sep
- "long_sep": "=",
-
- # this is for programs that expect their input to be from a terminal.
- # ssh is one of those programs
- "tty_in": False,
- "tty_out": True,
-
- "encoding": DEFAULT_ENCODING,
- "decode_errors": "strict",
-
- # how long the process should run before it is auto-killed
- "timeout": 0,
- "timeout_signal": signal.SIGKILL,
-
- # TODO write some docs on "long-running processes"
- # these control whether or not stdout/err will get aggregated together
- # as the process runs. this has memory usage implications, so sometimes
- # with long-running processes with a lot of data, it makes sense to
- # set these to true
- "no_out": False,
- "no_err": False,
- "no_pipe": False,
-
- # if any redirection is used for stdout or stderr, internal buffering
- # of that data is not stored. this forces it to be stored, as if
- # the output is being T'd to both the redirected destination and our
- # internal buffers
- "tee": None,
-
- # will be called when a process terminates without exception. this
- # option also puts the command in the background, since it doesn't make
- # sense to have an un-backgrounded command with a done callback
- "done": None,
-
- # a tuple (rows, columns) of the desired size of both the stdout and
- # stdin ttys, if ttys are being used
- "tty_size": (20, 80),
- }
-
- # these are arguments that cannot be called together, because they wouldn't
- # make any sense
- _incompatible_call_args = (
- #("fg", "bg", "Command can't be run in the foreground and background"),
- ("err", "err_to_out", "Stderr is already being redirected"),
- ("piped", "iter", "You cannot iterate when this command is being piped"),
- ("piped", "no_pipe", "Using a pipe doesn't make sense if you've \
-disabled the pipe"),
- ("no_out", "iter", "You cannot iterate over output if there is no \
-output"),
- )
-
-
- # this method exists because of the need to have some way of letting
- # manual object instantiation not perform the underscore-to-dash command
- # conversion that resolve_program uses.
- #
- # there are 2 ways to create a Command object. using sh.Command(<program>)
- # or by using sh.<program>. the method fed into sh.Command must be taken
- # literally, and so no underscore-dash conversion is performed. the one
- # for sh.<program> must do the underscore-dash converesion, because we
- # can't type dashes in method names
- @classmethod
- def _create(cls, program, **default_kwargs):
- path = resolve_program(program)
- if not path:
- raise CommandNotFound(program)
-
- cmd = cls(path)
- if default_kwargs:
- cmd = cmd.bake(**default_kwargs)
-
- return cmd
-
-
- def __init__(self, path):
- found = which(path)
- if not found:
- raise CommandNotFound(path)
-
- self._path = encode_to_py3bytes_or_py2str(found)
-
- self._partial = False
- self._partial_baked_args = []
- self._partial_call_args = {}
-
- # bugfix for functools.wraps. issue #121
- self.__name__ = str(self)
-
-
- def __getattribute__(self, name):
- # convenience
- getattr = partial(object.__getattribute__, self)
-
- if name.startswith("_"):
- return getattr(name)
- if name == "bake":
- return getattr("bake")
- if name.endswith("_"):
- name = name[:-1]
-
- return getattr("bake")(name)
-
-
- @staticmethod
- def _extract_call_args(kwargs, to_override={}):
- kwargs = kwargs.copy()
- call_args = {}
- for parg, default in Command._call_args.items():
- key = "_" + parg
-
- if key in kwargs:
- call_args[parg] = kwargs[key]
- del kwargs[key]
- elif parg in to_override:
- call_args[parg] = to_override[parg]
-
- # test for incompatible call args
- s1 = set(call_args.keys())
- for args in Command._incompatible_call_args:
- args = list(args)
- error = args.pop()
-
- if s1.issuperset(args):
- raise TypeError("Invalid special arguments %r: %s" % (args, error))
-
- return call_args, kwargs
-
-
- def _aggregate_keywords(self, keywords, sep, raw=False):
- processed = []
- for k, v in keywords.items():
- # we're passing a short arg as a kwarg, example:
- # cut(d="\t")
- if len(k) == 1:
- if v is not False:
- processed.append(encode_to_py3bytes_or_py2str("-" + k))
- if v is not True:
- processed.append(encode_to_py3bytes_or_py2str(v))
-
- # we're doing a long arg
- else:
- if not raw:
- k = k.replace("_", "-")
-
- if v is True:
- processed.append(encode_to_py3bytes_or_py2str("--" + k))
- elif v is False:
- pass
- else:
- arg = encode_to_py3bytes_or_py2str("--%s%s%s" % (k, sep, v))
- processed.append(arg)
- return processed
-
-
- def _compile_args(self, args, kwargs, sep):
- processed_args = []
-
- # aggregate positional args
- for arg in args:
- if isinstance(arg, (list, tuple)):
- if not arg:
- warnings.warn("Empty list passed as an argument to %r. \
-If you're using glob.glob(), please use sh.glob() instead." % self._path, stacklevel=3)
- for sub_arg in arg:
- processed_args.append(encode_to_py3bytes_or_py2str(sub_arg))
- elif isinstance(arg, dict):
- processed_args += self._aggregate_keywords(arg, sep, raw=True)
- else:
- processed_args.append(encode_to_py3bytes_or_py2str(arg))
-
- # aggregate the keyword arguments
- processed_args += self._aggregate_keywords(kwargs, sep)
-
- return processed_args
-
-
- # TODO needs documentation
- def bake(self, *args, **kwargs):
- fn = Command(self._path)
- fn._partial = True
-
- call_args, kwargs = self._extract_call_args(kwargs)
-
- pruned_call_args = call_args
- for k, v in Command._call_args.items():
- try:
- if pruned_call_args[k] == v:
- del pruned_call_args[k]
- except KeyError:
- continue
-
- fn._partial_call_args.update(self._partial_call_args)
- fn._partial_call_args.update(pruned_call_args)
- fn._partial_baked_args.extend(self._partial_baked_args)
- sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])
- fn._partial_baked_args.extend(self._compile_args(args, kwargs, sep))
- return fn
-
- def __str__(self):
- """ in python3, should return unicode. in python2, should return a
- string of bytes """
- if IS_PY3:
- return self.__unicode__()
- else:
- return self.__unicode__().encode(DEFAULT_ENCODING)
-
-
- def __eq__(self, other):
- try:
- return str(self) == str(other)
- except:
- return False
- __hash__ = None # Avoid DeprecationWarning in Python < 3
-
-
- def __repr__(self):
- """ in python3, should return unicode. in python2, should return a
- string of bytes """
- return "<Command %r>" % str(self)
-
-
- def __unicode__(self):
- """ a magic method defined for python2. calling unicode() on a
- self will call this """
- baked_args = " ".join(item.decode(DEFAULT_ENCODING) for item in self._partial_baked_args)
- if baked_args:
- baked_args = " " + baked_args
- return self._path.decode(DEFAULT_ENCODING) + baked_args
-
- def __enter__(self):
- with_context_warning()
- self(_with=True)
-
- def __exit__(self, typ, value, traceback):
- Command._prepend_stack.pop()
-
-
- def __call__(self, *args, **kwargs):
- kwargs = kwargs.copy()
- args = list(args)
-
- cmd = []
-
- # aggregate any 'with' contexts
- call_args = Command._call_args.copy()
- for prepend in self._prepend_stack:
- # don't pass the 'with' call arg
- pcall_args = prepend.call_args.copy()
- try:
- del pcall_args["with"]
- except:
- pass
-
- call_args.update(pcall_args)
- cmd.extend(prepend.cmd)
-
- cmd.append(self._path)
-
- # here we extract the special kwargs and override any
- # special kwargs from the possibly baked command
- tmp_call_args, kwargs = self._extract_call_args(kwargs, self._partial_call_args)
- call_args.update(tmp_call_args)
-
- if not getattr(call_args["ok_code"], "__iter__", None):
- call_args["ok_code"] = [call_args["ok_code"]]
-
-
- if call_args["done"]:
- call_args["bg"] = True
-
- # check if we're piping via composition
- stdin = call_args["in"]
- if args:
- first_arg = args.pop(0)
- if isinstance(first_arg, RunningCommand):
- # it makes sense that if the input pipe of a command is running
- # in the background, then this command should run in the
- # background as well
- if first_arg.call_args["bg"]:
- call_args["bg"] = True
-
- if first_arg.call_args["piped"] == "direct":
- stdin = first_arg.process
- else:
- stdin = first_arg.process._pipe_queue
-
- else:
- args.insert(0, first_arg)
-
- processed_args = self._compile_args(args, kwargs, call_args["long_sep"])
-
- # makes sure our arguments are broken up correctly
- split_args = self._partial_baked_args + processed_args
-
- final_args = split_args
-
- cmd.extend(final_args)
-
-
- # stdout redirection
- stdout = call_args["out"]
- if output_redirect_is_filename(stdout):
- stdout = open(str(stdout), "wb")
-
- # stderr redirection
- stderr = call_args["err"]
- if output_redirect_is_filename(stderr):
- stderr = open(str(stderr), "wb")
-
-
- return RunningCommand(cmd, call_args, stdin, stdout, stderr)
-
-
-
-
-def _start_daemon_thread(fn, *args):
- thrd = threading.Thread(target=fn, args=args)
- thrd.daemon = True
- thrd.start()
- return thrd
-
-
-def setwinsize(fd, rows_cols):
- """ set the terminal size of a tty file descriptor. borrowed logic
- from pexpect.py """
- rows, cols = rows_cols
- TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
-
- s = struct.pack('HHHH', rows, cols, 0, 0)
- fcntl.ioctl(fd, TIOCSWINSZ, s)
-
-def construct_streamreader_callback(process, handler):
- """ here we're constructing a closure for our streamreader callback. this
- is used in the case that we pass a callback into _out or _err, meaning we
- want to our callback to handle each bit of output
-
- we construct the closure based on how many arguments it takes. the reason
- for this is to make it as easy as possible for people to use, without
- limiting them. a new user will assume the callback takes 1 argument (the
- data). as they get more advanced, they may want to terminate the process,
- or pass some stdin back, and will realize that they can pass a callback of
- more args """
-
-
- # implied arg refers to the "self" that methods will pass in. we need to
- # account for this implied arg when figuring out what function the user
- # passed in based on number of args
- implied_arg = 0
-
- partial_args = 0
- handler_to_inspect = handler
-
- if isinstance(handler, partial):
- partial_args = len(handler.args)
- handler_to_inspect = handler.func
-
- if inspect.ismethod(handler_to_inspect):
- implied_arg = 1
- num_args = len(inspect.getargspec(handler_to_inspect).args)
-
- else:
- if inspect.isfunction(handler_to_inspect):
- num_args = len(inspect.getargspec(handler_to_inspect).args)
-
- # is an object instance with __call__ method
- else:
- implied_arg = 1
- num_args = len(inspect.getargspec(handler_to_inspect.__call__).args)
-
-
- net_args = num_args - implied_arg - partial_args
-
- handler_args = ()
-
- # just the chunk
- if net_args == 1:
- handler_args = ()
-
- # chunk, stdin
- if net_args == 2:
- handler_args = (process.stdin,)
-
- # chunk, stdin, process
- elif net_args == 3:
- # notice we're only storing a weakref, to prevent cyclic references
- # (where the process holds a streamreader, and a streamreader holds a
- # handler-closure with a reference to the process
- handler_args = (process.stdin, weakref.ref(process))
-
- def fn(chunk):
- # this is pretty ugly, but we're evaluating the process at call-time,
- # because it's a weakref
- args = handler_args
- if len(args) == 2:
- args = (handler_args[0], handler_args[1]())
- return handler(chunk, *args)
-
- return fn
-
-
-
-def handle_process_exit_code(exit_code):
- """ this should only ever be called once for each child process """
- # if we exited from a signal, let our exit code reflect that
- if os.WIFSIGNALED(exit_code):
- return -os.WTERMSIG(exit_code)
- # otherwise just give us a normal exit code
- elif os.WIFEXITED(exit_code):
- return os.WEXITSTATUS(exit_code)
- else:
- raise RuntimeError("Unknown child exit status!")
-
-
-
-
-class OProc(object):
- """ this class is instantiated by RunningCommand for a command to be exec'd.
- it handles all the nasty business involved with correctly setting up the
- input/output to the child process. it gets its name for subprocess.Popen
- (process open) but we're calling ours OProc (open process) """
-
- _default_window_size = (24, 80)
-
- # used in redirecting
- STDOUT = -1
- STDERR = -2
-
- def __init__(self, parent_log, cmd, stdin, stdout, stderr, call_args, pipe):
- """
- cmd is the full string that will be exec'd. it includes the program
- name and all its arguments
-
- stdin, stdout, stderr are what the child will use for standard
- input/output/err
-
- call_args is a mapping of all the special keyword arguments to apply
- to the child process
- """
-
- self.call_args = call_args
-
- # I had issues with getting 'Input/Output error reading stdin' from dd,
- # until I set _tty_out=False
- if self.call_args["piped"] == "direct":
- self.call_args["tty_out"] = False
-
- self._single_tty = self.call_args["tty_in"] and self.call_args["tty_out"]
-
- # this logic is a little convoluted, but basically this top-level
- # if/else is for consolidating input and output TTYs into a single
- # TTY. this is the only way some secure programs like ssh will
- # output correctly (is if stdout and stdin are both the same TTY)
- if self._single_tty:
- self._stdin_fd, self._slave_stdin_fd = pty.openpty()
-
- self._stdout_fd = self._stdin_fd
- self._slave_stdout_fd = self._slave_stdin_fd
-
- self._stderr_fd = self._stdin_fd
- self._slave_stderr_fd = self._slave_stdin_fd
-
- # do not consolidate stdin and stdout. this is the most common use-
- # case
- else:
- # this check here is because we may be doing "direct" piping
- # (_piped="direct"), and so our stdin might be an instance of
- # OProc
- if isinstance(stdin, OProc):
- self._slave_stdin_fd = stdin._stdout_fd
- self._stdin_fd = None
- elif self.call_args["tty_in"]:
- self._slave_stdin_fd, self._stdin_fd = pty.openpty()
- # tty_in=False is the default
- else:
- self._slave_stdin_fd, self._stdin_fd = os.pipe()
-
-
- # tty_out=True is the default
- if self.call_args["tty_out"]:
- self._stdout_fd, self._slave_stdout_fd = pty.openpty()
- else:
- self._stdout_fd, self._slave_stdout_fd = os.pipe()
-
- # unless STDERR is going to STDOUT, it ALWAYS needs to be a pipe,
- # and never a PTY. the reason for this is not totally clear to me,
- # but it has to do with the fact that if STDERR isn't set as the
- # CTTY (because STDOUT is), the STDERR buffer won't always flush
- # by the time the process exits, and the data will be lost.
- # i've only seen this on OSX.
- if stderr is not OProc.STDOUT:
- self._stderr_fd, self._slave_stderr_fd = os.pipe()
-
-
- # this is a hack, but what we're doing here is intentionally throwing an
- # OSError exception if our child processes's directory doesn't exist,
- # but we're doing it BEFORE we fork. the reason for before the fork is
- # error handling. i'm currently too lazy to implement what
- # subprocess.py did and set up a error pipe to handle exceptions that
- # happen in the child between fork and exec. it has only been seen in
- # the wild for a missing cwd, so we'll handle it here.
- cwd = self.call_args["cwd"]
- if cwd is not None and not os.path.exists(cwd):
- os.chdir(cwd)
-
-
- gc_enabled = gc.isenabled()
- if gc_enabled:
- gc.disable()
- self.pid = os.fork()
-
-
- # child
- if self.pid == 0: # pragma: no cover
- try:
- # ignoring SIGHUP lets us persist even after the parent process
- # exits. only ignore if we're backgrounded
- if self.call_args["bg"] is True:
- signal.signal(signal.SIGHUP, signal.SIG_IGN)
-
- # this piece of ugliness is due to a bug where we can lose output
- # if we do os.close(self._slave_stdout_fd) in the parent after
- # the child starts writing.
- # see http://bugs.python.org/issue15898
- if IS_OSX:
- time.sleep(0.01)
-
- os.setsid()
-
- if self.call_args["tty_out"]:
- # set raw mode, so there isn't any weird translation of
- # newlines to \r\n and other oddities. we're not outputting
- # to a terminal anyways
- #
- # we HAVE to do this here, and not in the parent process,
- # because we have to guarantee that this is set before the
- # child process is run, and we can't do it twice.
- tty.setraw(self._slave_stdout_fd)
-
-
- # if the parent-side fd for stdin exists, close it. the case
- # where it may not exist is if we're using piped="direct"
- if self._stdin_fd:
- os.close(self._stdin_fd)
-
- if not self._single_tty:
- os.close(self._stdout_fd)
- if stderr is not OProc.STDOUT:
- os.close(self._stderr_fd)
-
-
- if cwd:
- os.chdir(cwd)
-
- os.dup2(self._slave_stdin_fd, 0)
- os.dup2(self._slave_stdout_fd, 1)
-
- # we're not directing stderr to stdout? then set self._slave_stderr_fd to
- # fd 2, the common stderr fd
- if stderr is OProc.STDOUT:
- os.dup2(self._slave_stdout_fd, 2)
- else:
- os.dup2(self._slave_stderr_fd, 2)
-
- # don't inherit file descriptors
- max_fd = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
- os.closerange(3, max_fd)
-
-
- # set our controlling terminal. tty_out defaults to true
- if self.call_args["tty_out"]:
- tmp_fd = os.open(os.ttyname(1), os.O_RDWR)
- os.close(tmp_fd)
-
-
- if self.call_args["tty_out"]:
- setwinsize(1, self.call_args["tty_size"])
-
- # actually execute the process
- if self.call_args["env"] is None:
- os.execv(cmd[0], cmd)
- else:
- os.execve(cmd[0], cmd, self.call_args["env"])
-
- # we must ensure that we ALWAYS exit the child process, otherwise
- # the parent process code will be executed twice on exception
- # https://github.com/amoffat/sh/issues/202
- #
- # if your parent process experiences an exit code 255, it is most
- # likely that an exception occurred between the fork of the child
- # and the exec. this should be reported.
- finally:
- os._exit(255)
-
- # parent
- else:
- if gc_enabled:
- gc.enable()
-
- # used to determine what exception to raise. if our process was
- # killed via a timeout counter, we'll raise something different than
- # a SIGKILL exception
- self.timed_out = False
-
- self.started = time.time()
- self.cmd = cmd
-
- # exit code should only be manipulated from within self._wait_lock
- # to prevent race conditions
- self.exit_code = None
-
- self.stdin = stdin or Queue()
-
- # _pipe_queue is used internally to hand off stdout from one process
- # to another. by default, all stdout from a process gets dumped
- # into this pipe queue, to be consumed in real time (hence the
- # thread-safe Queue), or at a potentially later time
- self._pipe_queue = Queue()
-
- # this is used to prevent a race condition when we're waiting for
- # a process to end, and the OProc's internal threads are also checking
- # for the processes's end
- self._wait_lock = threading.Lock()
-
- # these are for aggregating the stdout and stderr. we use a deque
- # because we don't want to overflow
- self._stdout = deque(maxlen=self.call_args["internal_bufsize"])
- self._stderr = deque(maxlen=self.call_args["internal_bufsize"])
-
- if self.call_args["tty_in"]:
- setwinsize(self._stdin_fd, self.call_args["tty_size"])
-
-
- self.log = parent_log.get_child("process", repr(self))
-
- os.close(self._slave_stdin_fd)
- if not self._single_tty:
- os.close(self._slave_stdout_fd)
- if stderr is not OProc.STDOUT:
- os.close(self._slave_stderr_fd)
-
- self.log.debug("started process")
-
-
- if self.call_args["tty_in"]:
- attr = termios.tcgetattr(self._stdin_fd)
- attr[3] &= ~termios.ECHO
- termios.tcsetattr(self._stdin_fd, termios.TCSANOW, attr)
-
- # this represents the connection from a Queue object (or whatever
- # we're using to feed STDIN) to the process's STDIN fd
- self._stdin_stream = None
- if not isinstance(self.stdin, OProc):
- self._stdin_stream = \
- StreamWriter(self.log.get_child("streamwriter",
- "stdin"), self._stdin_fd, self.stdin,
- self.call_args["in_bufsize"],
- self.call_args["encoding"],
- self.call_args["tty_in"])
-
- stdout_pipe = None
- if pipe is OProc.STDOUT and not self.call_args["no_pipe"]:
- stdout_pipe = self._pipe_queue
-
-
- # this represents the connection from a process's STDOUT fd to
- # wherever it has to go, sometimes a pipe Queue (that we will use
- # to pipe data to other processes), and also an internal deque
- # that we use to aggregate all the output
- save_stdout = not self.call_args["no_out"] and \
- (self.call_args["tee"] in (True, "out") or stdout is None)
-
-
- # if we're piping directly into another process's filedescriptor, we
- # bypass reading from the stdout stream altogether, because we've
- # already hooked up this processes's stdout fd to the other
- # processes's stdin fd
- self._stdout_stream = None
- if self.call_args["piped"] != "direct":
- if callable(stdout):
- stdout = construct_streamreader_callback(self, stdout)
- self._stdout_stream = \
- StreamReader(self.log.get_child("streamreader",
- "stdout"), self._stdout_fd, stdout, self._stdout,
- self.call_args["out_bufsize"],
- self.call_args["encoding"],
- self.call_args["decode_errors"], stdout_pipe,
- save_data=save_stdout)
-
- if stderr is OProc.STDOUT or self._single_tty:
- self._stderr_stream = None
- else:
- stderr_pipe = None
- if pipe is OProc.STDERR and not self.call_args["no_pipe"]:
- stderr_pipe = self._pipe_queue
-
- save_stderr = not self.call_args["no_err"] and \
- (self.call_args["tee"] in ("err",) or stderr is None)
-
- if callable(stderr):
- stderr = construct_streamreader_callback(self, stderr)
-
- self._stderr_stream = StreamReader(Logger("streamreader"),
- self._stderr_fd, stderr, self._stderr,
- self.call_args["err_bufsize"], self.call_args["encoding"],
- self.call_args["decode_errors"], stderr_pipe,
- save_data=save_stderr)
-
-
- # start the main io threads
- # stdin thread is not needed if we are connecting from another process's stdout pipe
- self._input_thread = None
- if self._stdin_stream:
- self._input_thread = _start_daemon_thread(self.input_thread,
- self._stdin_stream)
-
- self._output_thread = _start_daemon_thread(self.output_thread,
- self._stdout_stream, self._stderr_stream,
- self.call_args["timeout"], self.started,
- self.call_args["timeout_signal"])
-
-
- def __repr__(self):
- return "<Process %d %r>" % (self.pid, self.cmd[:500])
-
-
- def change_in_bufsize(self, buf):
- self._stdin_stream.stream_bufferer.change_buffering(buf)
-
- def change_out_bufsize(self, buf):
- self._stdout_stream.stream_bufferer.change_buffering(buf)
-
- def change_err_bufsize(self, buf):
- self._stderr_stream.stream_bufferer.change_buffering(buf)
-
-
- def input_thread(self, stdin):
- """ this is run in a separate thread. it writes into our process's
- stdin (a streamwriter) and waits the process to end AND everything that
- can be written to be written """
- done = False
- while not done and self.is_alive():
- self.log.debug("%r ready for more input", stdin)
- done = stdin.write()
-
- stdin.close()
-
-
- def output_thread(self, stdout, stderr, timeout, started, timeout_exc):
- """ this function is run in a separate thread. it reads from the
- process's stdout stream (a streamreader), and waits for it to claim that
- its done """
-
- readers = []
- errors = []
-
- if stdout is not None:
- readers.append(stdout)
- errors.append(stdout)
- if stderr is not None:
- readers.append(stderr)
- errors.append(stderr)
-
- # this is our select loop for polling stdout or stderr that is ready to
- # be read and processed. if one of those streamreaders indicate that it
- # is done altogether being read from, we remove it from our list of
- # things to poll. when no more things are left to poll, we leave this
- # loop and clean up
- while readers:
- outputs, inputs, err = select.select(readers, [], errors, 0.1)
-
- # stdout and stderr
- for stream in outputs:
- self.log.debug("%r ready to be read from", stream)
- done = stream.read()
- if done:
- readers.remove(stream)
-
- for stream in err:
- pass
-
- # test if the process has been running too long
- if timeout:
- now = time.time()
- if now - started > timeout:
- self.log.debug("we've been running too long")
- self.timed_out = True
- self.signal(timeout_exc)
-
-
- # this is here because stdout may be the controlling TTY, and
- # we can't close it until the process has ended, otherwise the
- # child will get SIGHUP. typically, if we've broken out of
- # the above loop, and we're here, the process is just about to
- # end, so it's probably ok to aggressively poll self.is_alive()
- #
- # the other option to this would be to do the CTTY close from
- # the method that does the actual os.waitpid() call, but the
- # problem with that is that the above loop might still be
- # running, and closing the fd will cause some operation to
- # fail. this is less complex than wrapping all the ops
- # in the above loop with out-of-band fd-close exceptions
- while self.is_alive():
- time.sleep(0.001)
-
- if stdout:
- stdout.close()
-
- if stderr:
- stderr.close()
-
-
- @property
- def stdout(self):
- return "".encode(self.call_args["encoding"]).join(self._stdout)
-
- @property
- def stderr(self):
- return "".encode(self.call_args["encoding"]).join(self._stderr)
-
-
- def signal(self, sig):
- self.log.debug("sending signal %d", sig)
- try:
- os.kill(self.pid, sig)
- except OSError:
- pass
-
- def kill(self):
- self.log.debug("killing")
- self.signal(signal.SIGKILL)
-
- def terminate(self):
- self.log.debug("terminating")
- self.signal(signal.SIGTERM)
-
-
- def is_alive(self):
- """ polls if our child process has completed, without blocking. this
- method has side-effects, such as setting our exit_code, if we happen to
- see our child exit while this is running """
-
- if self.exit_code is not None:
- return False
-
- # what we're doing here essentially is making sure that the main thread
- # (or another thread), isn't calling .wait() on the process. because
- # .wait() calls os.waitpid(self.pid, 0), we can't do an os.waitpid
- # here...because if we did, and the process exited while in this
- # thread, the main thread's os.waitpid(self.pid, 0) would raise OSError
- # (because the process ended in another thread).
- #
- # so essentially what we're doing is, using this lock, checking if
- # we're calling .wait(), and if we are, let .wait() get the exit code
- # and handle the status, otherwise let us do it.
- acquired = self._wait_lock.acquire(False)
- if not acquired:
- if self.exit_code is not None:
- return False
- return True
-
- try:
- # WNOHANG is just that...we're calling waitpid without hanging...
- # essentially polling the process. the return result is (0, 0) if
- # there's no process status, so we check that pid == self.pid below
- # in order to determine how to proceed
- pid, exit_code = os.waitpid(self.pid, os.WNOHANG)
- if pid == self.pid:
- self.exit_code = handle_process_exit_code(exit_code)
- return False
-
- # no child process
- except OSError:
- return False
- else:
- return True
- finally:
- self._wait_lock.release()
-
-
- def wait(self):
- """ waits for the process to complete, handles the exit code """
-
- self.log.debug("acquiring wait lock to wait for completion")
- # using the lock in a with-context blocks, which is what we want if
- # we're running wait()
- with self._wait_lock:
- self.log.debug("got wait lock")
-
- if self.exit_code is None:
- self.log.debug("exit code not set, waiting on pid")
- pid, exit_code = os.waitpid(self.pid, 0) # blocks
- self.exit_code = handle_process_exit_code(exit_code)
- else:
- self.log.debug("exit code already set (%d), no need to wait", self.exit_code)
-
- # we may not have a thread for stdin, if the pipe has been connected
- # via _piped="direct"
- if self._input_thread:
- self._input_thread.join()
-
- # wait for our stdout and stderr streamreaders to finish reading and
- # aggregating the process output
- self._output_thread.join()
-
- return self.exit_code
-
-
-
-
-class DoneReadingForever(Exception): pass
-class NotYetReadyToRead(Exception): pass
-
-
-def determine_how_to_read_input(input_obj):
- """ given some kind of input object, return a function that knows how to
- read chunks of that input object.
-
- each reader function should return a chunk and raise a DoneReadingForever
- exception, or return None, when there's no more data to read
-
- NOTE: the function returned does not need to care much about the requested
- buffering type (eg, unbuffered vs newline-buffered). the StreamBufferer
- will take care of that. these functions just need to return a
- reasonably-sized chunk of data. """
-
- get_chunk = None
-
- if isinstance(input_obj, Queue):
- log_msg = "queue"
- get_chunk = get_queue_chunk_reader(input_obj)
-
- elif callable(input_obj):
- log_msg = "callable"
- get_chunk = get_callable_chunk_reader(input_obj)
-
- # also handles stringio
- elif hasattr(input_obj, "read"):
- log_msg = "file descriptor"
- get_chunk = get_file_chunk_reader(input_obj)
-
- elif isinstance(input_obj, basestring):
- log_msg = "string"
- get_chunk = get_iter_string_reader(input_obj)
-
- else:
- log_msg = "general iterable"
- get_chunk = get_iter_chunk_reader(iter(input_obj))
-
- return get_chunk, log_msg
-
-
-
-def get_queue_chunk_reader(stdin):
- def fn():
- try:
- chunk = stdin.get(True, 0.01)
- except Empty:
- raise NotYetReadyToRead
- if chunk is None:
- raise DoneReadingForever
- return chunk
- return fn
-
-
-def get_callable_chunk_reader(stdin):
- def fn():
- try:
- return stdin()
- except:
- raise DoneReadingForever
- return fn
-
-
-def get_iter_string_reader(stdin):
- """ return an iterator that returns a chunk of a string every time it is
- called. notice that even though bufsize_type might be line buffered, we're
- not doing any line buffering here. that's because our StreamBufferer
- handles all buffering. we just need to return a reasonable-sized chunk. """
- bufsize = 1024
- iter_str = (stdin[i:i + bufsize] for i in range(0, len(stdin), bufsize))
- return get_iter_chunk_reader(iter_str)
-
-
-def get_iter_chunk_reader(stdin):
- def fn():
- try:
- if IS_PY3:
- chunk = stdin.__next__()
- else:
- chunk = stdin.next()
- return chunk
- except StopIteration:
- raise DoneReadingForever
- return fn
-
-def get_file_chunk_reader(stdin):
- bufsize = 1024
-
- def fn():
- chunk = stdin.read(bufsize)
- if not chunk:
- raise DoneReadingForever
- else:
- return chunk
- return fn
-
-
-def bufsize_type_to_bufsize(bf_type):
- """ for a given bufsize type, return the actual bufsize we will read.
- notice that although 1 means "newline-buffered", we're reading a chunk size
- of 1024. this is because we have to read something. we let a
- StreamBufferer instance handle splitting our chunk on newlines """
-
- # newlines
- if bf_type == 1:
- bufsize = 1024
- # unbuffered
- elif bf_type == 0:
- bufsize = 1
- # or buffered by specific amount
- else:
- bufsize = bf_type
-
- return bufsize
-
-
-
-class StreamWriter(object):
- """ StreamWriter reads from some input (the stdin param) and writes to a fd
- (the stream param). the stdin may be a Queue, a callable, something with
- the "read" method, a string, or an iterable """
-
- def __init__(self, log, stream, stdin, bufsize_type, encoding, tty_in):
- self.stream = stream
- self.stdin = stdin
-
- self.log = log
- self.encoding = encoding
- self.tty_in = tty_in
-
-
- self.stream_bufferer = StreamBufferer(bufsize_type, self.encoding)
- self.get_chunk, log_msg = determine_how_to_read_input(stdin)
- self.log.debug("parsed stdin as a %s", log_msg)
-
-
- def fileno(self):
- """ defining this allows us to do select.select on an instance of this
- class """
- return self.stream
-
-
-
- def write(self):
- """ attempt to get a chunk of data to write to our child process's
- stdin, then write it. the return value answers the questions "are we
- done writing forever?" """
-
- # get_chunk may sometimes return bytes, and sometimes returns trings
- # because of the nature of the different types of STDIN objects we
- # support
- try:
- chunk = self.get_chunk()
- if chunk is None:
- raise DoneReadingForever
-
- except DoneReadingForever:
- self.log.debug("done reading")
-
- if self.tty_in:
- # EOF time
- try:
- char = termios.tcgetattr(self.stream)[6][termios.VEOF]
- except:
- char = chr(4).encode()
- os.write(self.stream, char)
-
- return True
-
- except NotYetReadyToRead:
- self.log.debug("received no data")
- return False
-
- # if we're not bytes, make us bytes
- if IS_PY3 and hasattr(chunk, "encode"):
- chunk = chunk.encode(self.encoding)
-
- for proc_chunk in self.stream_bufferer.process(chunk):
- self.log.debug("got chunk size %d: %r", len(proc_chunk),
- proc_chunk[:30])
-
- self.log.debug("writing chunk to process")
- try:
- os.write(self.stream, proc_chunk)
- except OSError:
- self.log.debug("OSError writing stdin chunk")
- return True
-
-
- def close(self):
- self.log.debug("closing, but flushing first")
- chunk = self.stream_bufferer.flush()
- self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
- try:
- if chunk:
- os.write(self.stream, chunk)
-
- if not self.tty_in:
- self.log.debug("we used a TTY, so closing the stream")
- os.close(self.stream)
-
- except OSError:
- pass
-
-
-
-def determine_how_to_feed_output(handler, encoding, decode_errors):
- if callable(handler):
- process, finish = get_callback_chunk_consumer(handler, encoding,
- decode_errors)
- elif isinstance(handler, cStringIO):
- process, finish = get_cstringio_chunk_consumer(handler)
- elif isinstance(handler, StringIO):
- process, finish = get_stringio_chunk_consumer(handler, encoding,
- decode_errors)
- elif hasattr(handler, "write"):
- process, finish = get_file_chunk_consumer(handler)
- else:
- process = lambda chunk: False
- finish = lambda: None
-
- return process, finish
-
-
-def get_file_chunk_consumer(handler):
- def process(chunk):
- handler.write(chunk)
- # we should flush on an fd. chunk is already the correctly-buffered
- # size, so we don't need the fd buffering as well
- handler.flush()
- return False
-
- def finish():
- if hasattr(handler, "flush"):
- handler.flush()
-
- return process, finish
-
-def get_callback_chunk_consumer(handler, encoding, decode_errors):
- def process(chunk):
- # try to use the encoding first, if that doesn't work, send
- # the bytes, because it might be binary
- try:
- chunk = chunk.decode(encoding, decode_errors)
- except UnicodeDecodeError:
- pass
- return handler(chunk)
-
- def finish():
- pass
-
- return process, finish
-
-def get_cstringio_chunk_consumer(handler):
- def process(chunk):
- handler.write(chunk)
- return False
-
- def finish():
- pass
-
- return process, finish
-
-
-def get_stringio_chunk_consumer(handler, encoding, decode_errors):
- def process(chunk):
- handler.write(chunk.decode(encoding, decode_errors))
- return False
-
- def finish():
- pass
-
- return process, finish
-
-
-class StreamReader(object):
- """ reads from some output (the stream) and sends what it just read to the
- handler. """
- def __init__(self, log, stream, handler, buffer, bufsize_type, encoding,
- decode_errors, pipe_queue=None, save_data=True):
- self.stream = stream
- self.buffer = buffer
- self.save_data = save_data
- self.encoding = encoding
- self.decode_errors = decode_errors
-
- self.pipe_queue = None
- if pipe_queue:
- self.pipe_queue = weakref.ref(pipe_queue)
-
- self.log = log
-
- self.stream_bufferer = StreamBufferer(bufsize_type, self.encoding,
- self.decode_errors)
- self.bufsize = bufsize_type_to_bufsize(bufsize_type)
-
- self.process_chunk, self.finish_chunk_processor = \
- determine_how_to_feed_output(handler, encoding, decode_errors)
-
- self.should_quit = False
-
-
- def fileno(self):
- """ defining this allows us to do select.select on an instance of this
- class """
- return self.stream
-
- def close(self):
- chunk = self.stream_bufferer.flush()
- self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
- if chunk:
- self.write_chunk(chunk)
-
- self.finish_chunk_processor()
-
- if self.pipe_queue and self.save_data:
- self.pipe_queue().put(None)
-
- try:
- os.close(self.stream)
- except OSError:
- pass
-
-
- def write_chunk(self, chunk):
- # in PY3, the chunk coming in will be bytes, so keep that in mind
-
- if not self.should_quit:
- self.should_quit = self.process_chunk(chunk)
-
-
- if self.save_data:
- self.buffer.append(chunk)
-
- if self.pipe_queue:
- self.log.debug("putting chunk onto pipe: %r", chunk[:30])
- self.pipe_queue().put(chunk)
-
-
- def read(self):
- # if we're PY3, we're reading bytes, otherwise we're reading
- # str
- try:
- chunk = os.read(self.stream, self.bufsize)
- except OSError as e:
- self.log.debug("got errno %d, done reading", e.errno)
- return True
- if not chunk:
- self.log.debug("got no chunk, done reading")
- return True
-
- self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
- for chunk in self.stream_bufferer.process(chunk):
- self.write_chunk(chunk)
-
-
-
-
-class StreamBufferer(object):
- """ this is used for feeding in chunks of stdout/stderr, and breaking it up
- into chunks that will actually be put into the internal buffers. for
- example, if you have two processes, one being piped to the other, and you
- want that, first process to feed lines of data (instead of the chunks
- however they come in), OProc will use an instance of this class to chop up
- the data and feed it as lines to be sent down the pipe """
-
- def __init__(self, buffer_type, encoding=DEFAULT_ENCODING,
- decode_errors="strict"):
- # 0 for unbuffered, 1 for line, everything else for that amount
- self.type = buffer_type
- self.buffer = []
- self.n_buffer_count = 0
- self.encoding = encoding
- self.decode_errors = decode_errors
-
- # this is for if we change buffering types. if we change from line
- # buffered to unbuffered, its very possible that our self.buffer list
- # has data that was being saved up (while we searched for a newline).
- # we need to use that up, so we don't lose it
- self._use_up_buffer_first = False
-
- # the buffering lock is used because we might chance the buffering
- # types from a different thread. for example, if we have a stdout
- # callback, we might use it to change the way stdin buffers. so we
- # lock
- self._buffering_lock = threading.RLock()
- self.log = Logger("stream_bufferer")
-
-
- def change_buffering(self, new_type):
- # TODO, when we stop supporting 2.6, make this a with context
- self.log.debug("acquiring buffering lock for changing buffering")
- self._buffering_lock.acquire()
- self.log.debug("got buffering lock for changing buffering")
- try:
- if new_type == 0:
- self._use_up_buffer_first = True
-
- self.type = new_type
- finally:
- self._buffering_lock.release()
- self.log.debug("released buffering lock for changing buffering")
-
-
- def process(self, chunk):
- # MAKE SURE THAT THE INPUT IS PY3 BYTES
- # THE OUTPUT IS ALWAYS PY3 BYTES
-
- # TODO, when we stop supporting 2.6, make this a with context
- self.log.debug("acquiring buffering lock to process chunk (buffering: %d)", self.type)
- self._buffering_lock.acquire()
- self.log.debug("got buffering lock to process chunk (buffering: %d)", self.type)
- try:
- # we've encountered binary, permanently switch to N size buffering
- # since matching on newline doesn't make sense anymore
- if self.type == 1:
- try:
- chunk.decode(self.encoding, self.decode_errors)
- except:
- self.log.debug("detected binary data, changing buffering")
- self.change_buffering(1024)
-
- # unbuffered
- if self.type == 0:
- if self._use_up_buffer_first:
- self._use_up_buffer_first = False
- to_write = self.buffer
- self.buffer = []
- to_write.append(chunk)
- return to_write
-
- return [chunk]
-
- # line buffered
- # we must decode the bytes before we try to match on newline
- elif self.type == 1:
- total_to_write = []
- chunk = chunk.decode(self.encoding, self.decode_errors)
- while True:
- newline = chunk.find("\n")
- if newline == -1:
- break
-
- chunk_to_write = chunk[:newline + 1]
- if self.buffer:
- # this is ugly, but it's designed to take the existing
- # bytes buffer, join it together, tack on our latest
- # chunk, then convert the whole thing to a string.
- # it's necessary, i'm sure. read the whole block to
- # see why.
- chunk_to_write = "".encode(self.encoding).join(self.buffer) \
- + chunk_to_write.encode(self.encoding)
- chunk_to_write = chunk_to_write.decode(self.encoding)
-
- self.buffer = []
- self.n_buffer_count = 0
-
- chunk = chunk[newline + 1:]
- total_to_write.append(chunk_to_write.encode(self.encoding))
-
- if chunk:
- self.buffer.append(chunk.encode(self.encoding))
- self.n_buffer_count += len(chunk)
- return total_to_write
-
- # N size buffered
- else:
- total_to_write = []
- while True:
- overage = self.n_buffer_count + len(chunk) - self.type
- if overage >= 0:
- ret = "".encode(self.encoding).join(self.buffer) + chunk
- chunk_to_write = ret[:self.type]
- chunk = ret[self.type:]
- total_to_write.append(chunk_to_write)
- self.buffer = []
- self.n_buffer_count = 0
- else:
- self.buffer.append(chunk)
- self.n_buffer_count += len(chunk)
- break
- return total_to_write
- finally:
- self._buffering_lock.release()
- self.log.debug("released buffering lock for processing chunk (buffering: %d)", self.type)
-
-
- def flush(self):
- self.log.debug("acquiring buffering lock for flushing buffer")
- self._buffering_lock.acquire()
- self.log.debug("got buffering lock for flushing buffer")
- try:
- ret = "".encode(self.encoding).join(self.buffer)
- self.buffer = []
- return ret
- finally:
- self._buffering_lock.release()
- self.log.debug("released buffering lock for flushing buffer")
-
-
-
-@contextmanager
-def pushd(path):
- """ pushd is just a specialized form of args, where we're passing in the
- current working directory """
- with args(_cwd=path):
- yield
-
-
-@contextmanager
-def args(*args, **kwargs):
- """ allows us to temporarily override all the special keyword parameters in
- a with context """
- call_args = Command._call_args
- old_args = call_args.copy()
-
- for key,value in kwargs.items():
- key = key.lstrip("_")
- call_args[key] = value
-
- yield
- call_args.update(old_args)
-
-
-
-class Environment(dict):
- """ this allows lookups to names that aren't found in the global scope to be
- searched for as a program name. for example, if "ls" isn't found in this
- module's scope, we consider it a system program and try to find it.
-
- we use a dict instead of just a regular object as the base class because the
- exec() statement used in this file requires the "globals" argument to be a
- dictionary """
-
-
- # this is a list of all of the names that the sh module exports that will
- # not resolve to functions. we don't want to accidentally shadow real
- # commands with functions/imports that we define in sh.py. for example,
- # "import time" may override the time system program
- whitelist = set([
- "Command",
- "CommandNotFound",
- "DEFAULT_ENCODING",
- "DoneReadingForever",
- "ErrorReturnCode",
- "NotYetReadyToRead",
- "SignalException",
- "TimeoutException",
- "__project_url__",
- "__version__",
- "args",
- "glob",
- "pushd",
- ])
-
- def __init__(self, globs, baked_args={}):
- self.globs = globs
- self.baked_args = baked_args
- self.disable_whitelist = False
-
- def __setitem__(self, k, v):
- self.globs[k] = v
-
- def __getitem__(self, k):
- # if we first import "_disable_whitelist" from sh, we can import
- # anything defined in the global scope of sh.py. this is useful for our
- # tests
- if k == "_disable_whitelist":
- self.disable_whitelist = True
- return None
-
- # we're trying to import something real (maybe), see if it's in our
- # global scope
- if k in self.whitelist or self.disable_whitelist:
- try:
- return self.globs[k]
- except KeyError:
- pass
-
- # somebody tried to be funny and do "from sh import *"
- if k == "__all__":
- raise AttributeError("Cannot import * from sh. \
-Please import sh or import programs individually.")
-
-
- # check if we're naming a dynamically generated ReturnCode exception
- exc = get_exc_from_name(k)
- if exc:
- return exc
-
-
- # https://github.com/ipython/ipython/issues/2577
- # https://github.com/amoffat/sh/issues/97#issuecomment-10610629
- if k.startswith("__") and k.endswith("__"):
- raise AttributeError
-
- # how about an environment variable?
- try:
- return os.environ[k]
- except KeyError:
- pass
-
- # is it a custom builtin?
- builtin = getattr(self, "b_" + k, None)
- if builtin:
- return builtin
-
- # it must be a command then
- # we use _create instead of instantiating the class directly because
- # _create uses resolve_program, which will automatically do underscore-
- # to-dash conversions. instantiating directly does not use that
- return Command._create(k, **self.baked_args)
-
-
- # methods that begin with "b_" are custom builtins and will override any
- # program that exists in our path. this is useful for things like
- # common shell builtins that people are used to, but which aren't actually
- # full-fledged system binaries
-
- def b_cd(self, path):
- os.chdir(path)
-
- def b_which(self, program):
- return which(program)
-
-
-
-
-def run_repl(env): # pragma: no cover
- banner = "\n>> sh v{version}\n>> https://github.com/amoffat/sh\n"
-
- print(banner.format(version=__version__))
- while True:
- try:
- line = raw_input("sh> ")
- except (ValueError, EOFError):
- break
-
- try:
- exec(compile(line, "<dummy>", "single"), env, env)
- except SystemExit:
- break
- except:
- print(traceback.format_exc())
-
- # cleans up our last line
- print("")
-
-
-
-
-# this is a thin wrapper around THIS module (we patch sys.modules[__name__]).
-# this is in the case that the user does a "from sh import whatever"
-# in other words, they only want to import certain programs, not the whole
-# system PATH worth of commands. in this case, we just proxy the
-# import lookup to our Environment class
-class SelfWrapper(ModuleType):
- def __init__(self, self_module, baked_args={}):
- # this is super ugly to have to copy attributes like this,
- # but it seems to be the only way to make reload() behave
- # nicely. if i make these attributes dynamic lookups in
- # __getattr__, reload sometimes chokes in weird ways...
- for attr in ["__builtins__", "__doc__", "__name__", "__package__"]:
- setattr(self, attr, getattr(self_module, attr, None))
-
- # python 3.2 (2.7 and 3.3 work fine) breaks on osx (not ubuntu)
- # if we set this to None. and 3.3 needs a value for __path__
- self.__path__ = []
- self.__self_module = self_module
- self.__env = Environment(globals(), baked_args)
-
- def __setattr__(self, name, value):
- if hasattr(self, "__env"):
- self.__env[name] = value
- else:
- ModuleType.__setattr__(self, name, value)
-
- def __getattr__(self, name):
- if name == "__env":
- raise AttributeError
- return self.__env[name]
-
- # accept special keywords argument to define defaults for all operations
- # that will be processed with given by return SelfWrapper
- def __call__(self, **kwargs):
- return SelfWrapper(self.__self_module, kwargs)
-
-
-
-# we're being run as a stand-alone script
-if __name__ == "__main__": # pragma: no cover
- try:
- arg = sys.argv.pop(1)
- except:
- arg = None
-
- if arg == "test":
- import subprocess
-
- def run_test(version, locale):
- py_version = "python%s" % version
- py_bin = which(py_version)
-
- if py_bin:
- print("Testing %s, locale %r" % (py_version.capitalize(),
- locale))
-
- env = os.environ.copy()
- env["LANG"] = locale
- p = subprocess.Popen([py_bin, os.path.join(THIS_DIR, "test.py")]
- + sys.argv[1:], env=env)
- return_code = p.wait()
-
- if return_code != 0:
- exit(1)
- else:
- print("Couldn't find %s, skipping" % py_version.capitalize())
-
- versions = ("2.6", "2.7", "3.1", "3.2", "3.3", "3.4")
- locales = ("en_US.UTF-8", "C")
- for locale in locales:
- for version in versions:
- run_test(version, locale)
-
- else:
- env = Environment(globals())
- run_repl(env)
-
-# we're being imported from somewhere
-else:
- self = sys.modules[__name__]
- sys.modules[__name__] = SelfWrapper(self)
diff --git a/local_ceph.yaml b/local_ceph.yaml
new file mode 100644
index 0000000..e945a06
--- /dev/null
+++ b/local_ceph.yaml
@@ -0,0 +1,28 @@
+clouds:
+ ceph: local
+
+discover: ceph
+
+explicit_nodes:
+ "ssh://koder@192.168.152.43::/home/koder/.ssh/id_rsa": testnode
+
+internal:
+ var_dir_root: /tmp/perf_tests
+
+sensors:
+ receiver_uri: "udp://{ip}:5699"
+ roles_mapping:
+ ceph-osd: block-io
+ cinder: block-io, system-cpu
+ testnode: system-cpu, block-io
+
+tests:
+ - io:
+ # cfg: tests/io_scenario_hdd.cfg
+ cfg: scripts/fio_tests_configs/io_task_test.cfg
+ params:
+ FILENAME: /mnt/ceph/xxx.bin
+ NUM_ROUNDS: 7
+
+logging:
+ extra_logs: 1
diff --git a/nodes/__init__.py b/nodes/__init__.py
deleted file mode 100644
index 9198922..0000000
--- a/nodes/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"this package contains node discovery code"
diff --git a/nodes/fuel.py b/nodes/fuel.py
deleted file mode 100644
index 82e7542..0000000
--- a/nodes/fuel.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# import logging
-
-
-# import fuel_rest_api
-# from node import Node
-
-
-# logger = logging.getLogger("io-perf-tool")
-
-
-# def discover_fuel_nodes(root_url, credentials, cluster_name):
-# """Discover Fuel nodes"""
-# assert credentials.count(':') >= 2
-# user, passwd_tenant = credentials.split(":", 1)
-# passwd, tenant = passwd_tenant.rsplit(":", 1)
-# creds = dict(
-# username=user,
-# password=passwd,
-# tenant_name=tenant,
-# )
-
-# connection = fuel_rest_api.KeystoneAuth(root_url, creds)
-# fi = fuel_rest_api.FuelInfo(connection)
-
-# logger.debug("wtf")
-# clusters_id = fuel_rest_api.get_cluster_id(connection, cluster_name)
-# logger.debug("wtf2")
-
-# nodes = []
-
-# for node in fi.nodes:
-# if node.cluster == clusters_id:
-# nodes.append(node)
-
-# res = [Node(n.ip, n.get_roles()) for n in nodes]
-# logger.debug("Found %s fuel nodes for env %r" % (len(res), cluster_name))
-# return res
diff --git a/nodes/openstack.py b/nodes/openstack.py
deleted file mode 100644
index d212283..0000000
--- a/nodes/openstack.py
+++ /dev/null
@@ -1,102 +0,0 @@
-import socket
-import logging
-
-
-from novaclient.client import Client
-
-import node
-from disk_perf_test_tool.utils import parse_creds
-
-
-logger = logging.getLogger("io-perf-tool")
-
-
-def get_floating_ip(vm):
- addrs = vm.addresses
- for net_name, ifaces in addrs.items():
- for iface in ifaces:
- if iface.get('OS-EXT-IPS:type') == "floating":
- return iface['addr']
-
-
-def discover_vms(client, search_opts):
- user, password, key = parse_creds(search_opts.pop('auth'))
-
- servers = client.servers.list(search_opts=search_opts)
- logger.debug("Found %s openstack vms" % len(servers))
- return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
- password=password, key_path=key)
- for server in servers if get_floating_ip(server)]
-
-
-def discover_services(client, opts):
- user, password, key = parse_creds(opts.pop('auth'))
-
- services = []
- if opts['service'] == "all":
- services = client.services.list()
- else:
- if isinstance(opts['service'], basestring):
- opts['service'] = [opts['service']]
-
- for s in opts['service']:
- services.extend(client.services.list(binary=s))
-
- host_services_mapping = {}
-
- for service in services:
- ip = socket.gethostbyname(service.host)
- host_services_mapping[ip].append(service.binary)
-
- logger.debug("Found %s openstack service nodes" %
- len(host_services_mapping))
- return [node.Node(host, services, username=user,
- password=password, key_path=key) for host, services in
- host_services_mapping.items()]
-
-
-def discover_openstack_nodes(conn_details, conf):
- """Discover vms running in openstack
- :param conn_details - dict with openstack connection details -
- auth_url, api_key (password), username
- """
- client = Client(version='1.1', **conn_details)
- nodes = []
- if conf.get('discover'):
- # vms_to_discover = conf['discover'].get('vm')
- # if vms_to_discover:
- # nodes.extend(discover_vms(client, vms_to_discover))
- services_to_discover = conf['discover'].get('nodes')
- if services_to_discover:
- nodes.extend(discover_services(client, services_to_discover))
-
- return nodes
-
-
-# from disk_perf_test_tool.starts_vms import create_vms_mt
-# def start_test_vms(client, opts):
-
-# user = opts.pop("user", None)
-# key_file = opts.pop("key_file", None)
-# aff_group = opts.pop("aff_group", None)
-# raw_count = opts.pop('count')
-
-# if raw_count.startswith("x"):
-# logger.debug("Getting amount of compute services")
-# count = len(client.services.list(binary="nova-compute"))
-# count *= int(raw_count[1:])
-# else:
-# count = int(raw_count)
-
-# if aff_group is not None:
-# scheduler_hints = {'group': aff_group}
-# else:
-# scheduler_hints = None
-
-# opts['scheduler_hints'] = scheduler_hints
-
-# logger.debug("Will start {0} vms".format(count))
-
-# nodes = create_vms_mt(client, count, **opts)
-# return [node.Node(get_floating_ip(server), ["test_vm"], username=user,
-# key_path=key_file) for server in nodes]
diff --git a/perf1.yaml b/perf1.yaml
new file mode 100644
index 0000000..ecfd26b
--- /dev/null
+++ b/perf1.yaml
@@ -0,0 +1,65 @@
+clouds:
+ fuel:
+ url: http://172.6.52.112:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:test37
+ openstack_env: test
+
+discover: fuel
+
+# explicit_nodes:
+# "ssh://root@172.16.52.112:3022:/home/koder/.ssh/id_rsa": testnode
+
+internal:
+ var_dir_root: /tmp/perf_tests
+
+sensors:
+ # receiver_uri: udp://172.18.217.10:5699
+ receiver_uri: "udp://{ip}:5699"
+ roles_mapping:
+ ceph-osd: block-io
+ cinder: block-io, system-cpu
+ testnode: system-cpu, block-io
+
+tests:
+ - start_test_nodes:
+ creds: clouds
+ vm_params:
+ count: x1
+ image:
+ name: disk_io_perf
+ url: https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+ flavor:
+ name: disk_io_perf.1024
+ hdd_size: 50
+ ram_size: 1024
+ cpu: 1
+
+ keypair_name: disk_io_perf
+ network_zone_name: novanetwork
+ flt_ip_pool: nova
+ private_key_path: disk_io_perf.pem
+ creds: "ssh://ubuntu@{ip}::{private_key_path}"
+ name_templ: disk_io_perf-{0}
+ scheduler_group_name: disk_io_group_aa-{0}
+ security_group: disk_io_perf
+
+ tests:
+ - io:
+ cfg: tests/io_scenario_hdd.cfg
+ # cfg: scripts/fio_tests_configs/io_task_test.cfg
+ params:
+ FILENAME: /opt/xxx.bin
+ NUM_ROUNDS: 7
+#
+# - io:
+# # cfg: scripts/fio_tests_configs/io_task_test.cfg
+# cfg: tests/io_scenario_hdd.cfg
+# params:
+# # FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+# FILENAME: /opt/xxx.bin
+# NUM_ROUNDS: 7
+
+logging:
+ extra_logs: 1
\ No newline at end of file
diff --git a/report.py b/report.py
deleted file mode 100644
index 4eb3cb8..0000000
--- a/report.py
+++ /dev/null
@@ -1,248 +0,0 @@
-import os
-import sys
-from collections import OrderedDict
-
-import formatters
-from chart import charts
-from utils import ssize_to_b, parse_creds
-from statistic import med_dev
-from io_results_loader import filter_data
-from meta_info import total_lab_info, collect_lab_data
-
-
-OPERATIONS = (('async', ('randwrite asynchronous', 'randread asynchronous',
- 'write asynchronous', 'read asynchronous')),
- ('sync', ('randwrite synchronous', 'randread synchronous',
- 'write synchronous', 'read synchronous')),
- ('direct', ('randwrite direct', 'randread direct',
- 'write direct', 'read direct')))
-
-sync_async_view = {'s': 'synchronous',
- 'a': 'asynchronous',
- 'd': 'direct'}
-
-
-# def pgbench_chart_data(results):
-# """
-# Format pgbench results for chart
-# """
-# data = {}
-# charts_url = []
-
-# formatted_res = formatters.format_pgbench_stat(results)
-# for key, value in formatted_res.items():
-# num_cl, num_tr = key.split(' ')
-# data.setdefault(num_cl, {}).setdefault(build, {})
-# data[keys[z]][build][
-# ' '.join(keys)] = value
-
-# for name, value in data.items():
-# title = name
-# legend = []
-# dataset = []
-
-# scale_x = []
-
-# for build_id, build_results in value.items():
-# vals = []
-# OD = OrderedDict
-# ordered_build_results = OD(sorted(build_results.items(),
-# key=lambda t: t[0]))
-# scale_x = ordered_build_results.keys()
-# for key in scale_x:
-# res = build_results.get(key)
-# if res:
-# vals.append(res)
-# if vals:
-# dataset.append(vals)
-# legend.append(build_id)
-
-# if dataset:
-# charts_url.append(str(charts.render_vertical_bar
-# (title, legend, dataset, scale_x=scale_x)))
-# return charts_url
-
-
-def build_vertical_bar(results, z=0):
- data = {}
- charts_url = []
- for build, res in results:
- formatted_res = formatters.get_formatter(build)(res)
- for key, value in formatted_res.items():
- keys = key.split(' ')
- data.setdefault(keys[z], {}).setdefault(build, {})
- data[keys[z]][build][
- ' '.join(keys)] = value
-
- for name, value in data.items():
- title = name
- legend = []
- dataset = []
-
- scale_x = []
-
- for build_id, build_results in value.items():
- vals = []
- OD = OrderedDict
- ordered_build_results = OD(sorted(build_results.items(),
- key=lambda t: t[0]))
- scale_x = ordered_build_results.keys()
- for key in scale_x:
- res = build_results.get(key)
- if res:
- vals.append(res)
- if vals:
- dataset.append(vals)
- legend.append(build_id)
-
- if dataset:
- charts_url.append(str(charts.render_vertical_bar
- (title, legend, dataset, scale_x=scale_x)))
- return charts_url
-
-
-def build_lines_chart(results, z=0):
- data = {}
- charts_url = []
-
- for build, res in results:
- formatted_res = formatters.get_formatter(build)(res)
- for key, value in formatted_res.items():
- keys = key.split(' ')
- data.setdefault(key[z], {})
- data[key[z]].setdefault(build, {})[keys[1]] = value
-
- for name, value in data.items():
- title = name
- legend = []
- dataset = []
- scale_x = []
- for build_id, build_results in value.items():
- legend.append(build_id)
-
- OD = OrderedDict
- ordered_build_results = OD(sorted(build_results.items(),
- key=lambda t: ssize_to_b(t[0])))
-
- if not scale_x:
- scale_x = ordered_build_results.keys()
- dataset.append(zip(*ordered_build_results.values())[0])
-
- chart = charts.render_lines(title, legend, dataset, scale_x)
- charts_url.append(str(chart))
-
- return charts_url
-
-
-def render_html(charts_urls, dest, lab_description, info):
- templ = open("report.html", 'r').read()
- body = "<a href='#lab_desc'>Lab description</a>" \
- "<ol>{0}</ol>" \
- "<div>{1}</div>" \
- '<a name="lab_desc"></a>' \
- "<div><ul>{2}</ul></div>"
- table = "<table><tr><td>{0}</td><td>{1}</td></tr>" \
- "<tr><td>{2}</td><td>{3}</td></tr></table>"
- ul = []
- ol = []
- li = '<li>{0} : {1}</li>'
-
- for elem in info:
- ol.append(li.format(elem.keys(), elem.values()))
-
- for key in lab_description:
- value = lab_description[key]
- ul.append("<li>{0} : {1}</li>".
- format(key, value))
-
- charts_urls = ['<img src="{0}">'.format(url) for url in charts_urls]
- html = templ % {'body': body.format('\n'.join(ol), table.format(*charts_urls),
- '\n'.join(ul))
- }
- open(dest, 'w').write(html)
-
-
-def io_chart(title, concurence, latv, iops_or_bw, iops_or_bw_dev,
- legend, fname):
- bar_data, bar_dev = iops_or_bw, iops_or_bw_dev
- legend = [legend]
-
- iops_or_bw_per_vm = []
- for i in range(len(concurence)):
- iops_or_bw_per_vm.append(iops_or_bw[i] / concurence[i])
-
- bar_dev_bottom = []
- bar_dev_top = []
- for i in range(len(bar_data)):
- bar_dev_top.append(bar_data[i] + bar_dev[i])
- bar_dev_bottom.append(bar_data[i] - bar_dev[i])
-
- latv = [lat / 1000 for lat in latv]
- ch = charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
- [bar_dev_bottom], file_name=fname,
- scale_x=concurence,
- lines=[
- (latv, "msec", "rr", "lat"),
- (iops_or_bw_per_vm, None, None,
- "bw_per_vm")
- ])
- return str(ch)
-
-
-def make_io_report(results, path, lab_url=None, creds=None):
- if lab_url is not None:
- username, password, tenant_name = parse_creds(creds)
- creds = {'username': username, 'password': password,
- "tenant_name": tenant_name}
- data = collect_lab_data(lab_url, creds)
- lab_info = total_lab_info(data)
- else:
- lab_info = ""
-
- for suite_type, test_suite_data in results:
- if suite_type != 'io' or test_suite_data is None:
- continue
-
- io_test_suite_res = test_suite_data['res']
-
- charts_url = []
- info = []
-
- name_filters = [
- ('hdd_test_rrd4k', ('concurence', 'lat', 'iops'), 'rand_read_4k'),
- ('hdd_test_swd1m', ('concurence', 'lat', 'bw'), 'seq_write_1m'),
- ('hdd_test_srd1m', ('concurence', 'lat', 'bw'), 'seq_read_1m'),
- ('hdd_test_rws4k', ('concurence', 'lat', 'bw'), 'rand_write_1m')
- ]
-
- for name_filter, fields, fname in name_filters:
- th_filter = filter_data(name_filter, fields)
-
- data = sorted(th_filter(io_test_suite_res.values()))
- if len(data) == 0:
- continue
-
- concurence, latv, iops_or_bw_v = zip(*data)
- iops_or_bw_v, iops_or_bw_dev_v = zip(*map(med_dev, iops_or_bw_v))
- latv, _ = zip(*map(med_dev, latv))
-
- url = io_chart(name_filter, concurence, latv, iops_or_bw_v,
- iops_or_bw_dev_v,
- fields[2], fname)
- info.append(dict(zip(fields, (concurence, latv, iops_or_bw_v))))
- charts_url.append(url)
-
- if len(charts_url) != 0:
- render_html(charts_url, path, lab_info, info)
-
-
-def main(args):
- make_io_report(results=[('a', 'b')],
- path=os.path.dirname(args[0]),
- lab_url='http://172.16.52.112:8000',
- creds='admin:admin@admin')
- return 0
-
-
-if __name__ == '__main__':
- exit(main(sys.argv))
diff --git a/requirements.txt b/requirements.txt
index 4412532..825a5bc 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
+priest
GChartWrapper==0.8
decorator==3.4.0
futures==2.2.0
diff --git a/scripts/agent.py b/scripts/agent.py
deleted file mode 100644
index f1fc3da..0000000
--- a/scripts/agent.py
+++ /dev/null
@@ -1,114 +0,0 @@
-import argparse
-import subprocess
-import sys
-import socket
-import fcntl
-import struct
-import array
-
-
-def all_interfaces():
- max_possible = 128 # arbitrary. raise if needed.
- bytes = max_possible * 32
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- names = array.array('B', '\0' * bytes)
- outbytes = struct.unpack('iL', fcntl.ioctl(
- s.fileno(),
- 0x8912, # SIOCGIFCONF
- struct.pack('iL', bytes, names.buffer_info()[0])
- ))[0]
- namestr = names.tostring()
- lst = []
- for i in range(0, outbytes, 40):
- name = namestr[i:i+16].split('\0', 1)[0]
- ip = namestr[i+20:i+24]
- lst.append((name, ip))
- return lst
-
-
-def format_ip(addr):
- return str(ord(addr[0])) + '.' + \
- str(ord(addr[1])) + '.' + \
- str(ord(addr[2])) + '.' + \
- str(ord(addr[3]))
-
-
-def find_interface_by_ip(ext_ip):
- ifs = all_interfaces()
- for i in ifs:
- ip = format_ip(i[1])
-
- if ip == ext_ip:
- return str(i[0])
-
- print "External ip doesnt corresponds to any of available interfaces"
- return None
-
-
-def make_tunnels(ips, ext_ip, base_port=12345, delete=False):
- node_port = {}
-
- if delete is True:
- mode = "-D"
- else:
- mode = "-A"
-
- iface = find_interface_by_ip(ext_ip)
-
- for ip in ips:
- p = subprocess.Popen(["iptables -t nat " + mode + " PREROUTING " +
- "-p tcp -i " + iface + " --dport "
- + str(base_port) +
- " -j DNAT --to " + str(ip) + ":22"],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- shell=True)
-
- out, err = p.communicate()
-
- if out is not None:
- print out
-
- if err is not None:
- print err
-
- node_port[ip] = base_port
- base_port += 1
-
- return node_port
-
-
-def parse_command_line(argv):
- parser = argparse.ArgumentParser(description=
- "Connect to fuel master "
- "and setup ssh agent")
- parser.add_argument(
- "--base_port", type=int, required=True)
-
- parser.add_argument(
- "--ext_ip", type=str, required=True)
-
- parser.add_argument(
- "--clean", type=bool, default=False)
-
- parser.add_argument(
- "--ports", type=str, nargs='+')
-
- return parser.parse_args(argv)
-
-
-def main(argv):
- arg_object = parse_command_line(argv)
- mapping = make_tunnels(arg_object.ports,
- ext_ip=arg_object.ext_ip,
- base_port=arg_object.base_port,
- delete=arg_object.clean)
-
- if arg_object.clean is False:
- for k in mapping:
- print k + " " + str(mapping[k])
-
-
-if __name__ == "__main__":
- main(sys.argv[1:])
diff --git a/scripts/prepare.sh b/scripts/prepare.sh
index 587f577..c114874 100644
--- a/scripts/prepare.sh
+++ b/scripts/prepare.sh
@@ -6,7 +6,7 @@
# settings
FL_RAM=1024
-FL_HDD=50
+FL_HDD=20
FL_CPU=1
diff --git a/persistance/__init__.py b/scripts/storage/__init__.py
similarity index 100%
rename from persistance/__init__.py
rename to scripts/storage/__init__.py
diff --git a/persistance/data_processing.py b/scripts/storage/data_processing.py
similarity index 100%
rename from persistance/data_processing.py
rename to scripts/storage/data_processing.py
diff --git a/persistance/db_manage.py b/scripts/storage/db_manage.py
similarity index 100%
rename from persistance/db_manage.py
rename to scripts/storage/db_manage.py
diff --git a/persistance/models.py b/scripts/storage/models.py
similarity index 100%
rename from persistance/models.py
rename to scripts/storage/models.py
diff --git a/persistance/storage_api.py b/scripts/storage/storage_api.py
similarity index 100%
rename from persistance/storage_api.py
rename to scripts/storage/storage_api.py
diff --git a/sensors/api.py b/sensors/api.py
deleted file mode 100644
index dc34af0..0000000
--- a/sensors/api.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import Queue
-import threading
-
-from contextlib import contextmanager
-
-from deploy_sensors import (deploy_and_start_sensors,
- stop_and_remove_sensors)
-from protocol import create_protocol, Timeout
-
-
-Empty = Queue.Empty
-
-
-def recv_main(proto, data_q, cmd_q):
- while True:
- try:
- data_q.put(proto.recv(0.1))
- except Timeout:
- pass
-
- try:
- val = cmd_q.get(False)
-
- if val is None:
- return
-
- except Queue.Empty:
- pass
-
-
-@contextmanager
-def start_monitoring(uri, config=None, connected_config=None):
- deploy_and_start_sensors(uri, config=config,
- connected_config=connected_config)
- try:
- data_q = Queue.Queue()
- cmd_q = Queue.Queue()
- proto = create_protocol(uri, receiver=True)
- th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
- th.daemon = True
- th.start()
-
- try:
- yield data_q
- finally:
- cmd_q.put(None)
- th.join()
- finally:
- stop_and_remove_sensors(config,
- connected_config=connected_config)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
deleted file mode 100644
index feb8bcb..0000000
--- a/sensors/deploy_sensors.py
+++ /dev/null
@@ -1,110 +0,0 @@
-import time
-import json
-import os.path
-import logging
-
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from disk_perf_test_tool.ssh_utils import connect, copy_paths
-
-logger = logging.getLogger('io-perf-tool')
-
-
-def wait_all_ok(futures):
- return all(future.result() for future in futures)
-
-
-def deploy_and_start_sensors(monitor_uri, config,
- remote_path='/tmp/sensors',
- connected_config=None):
-
- paths = {os.path.dirname(__file__): remote_path}
- with ThreadPoolExecutor(max_workers=32) as executor:
- futures = []
-
- if connected_config is not None:
- assert config is None
- node_iter = connected_config
- else:
- node_iter = config.items()
-
- for uri_or_conn, config in node_iter:
- futures.append(executor.submit(deploy_and_start_sensor,
- paths, uri_or_conn,
- monitor_uri,
- config, remote_path))
-
- if not wait_all_ok(futures):
- raise RuntimeError("Sensor deployment fails on some nodes")
-
-
-def deploy_and_start_sensor(paths, uri_or_conn, monitor_uri, config,
- remote_path):
- try:
- if isinstance(uri_or_conn, basestring):
- conn = connect(uri_or_conn)
- else:
- conn = uri_or_conn
-
- copy_paths(conn, paths)
- sftp = conn.open_sftp()
-
- config_remote_path = os.path.join(remote_path, "conf.json")
- main_remote_path = os.path.join(remote_path, "main.py")
-
- with sftp.open(config_remote_path, "w") as fd:
- fd.write(json.dumps(config))
-
- cmd_templ = "python {0} -d start -u {1} {2}"
- cmd = cmd_templ.format(main_remote_path,
- monitor_uri,
- config_remote_path)
- conn.exec_command(cmd)
- sftp.close()
-
- if isinstance(uri_or_conn, basestring):
- conn.close()
- except:
- logger.exception("During deploing sensors in {0}".format(uri_or_conn))
- return False
- return True
-
-
-def stop_and_remove_sensor(uri_or_conn, remote_path):
- if isinstance(uri_or_conn, basestring):
- conn = connect(uri_or_conn)
- else:
- conn = uri_or_conn
-
- main_remote_path = os.path.join(remote_path, "main.py")
-
- cmd_templ = "python {0} -d stop"
- conn.exec_command(cmd_templ.format(main_remote_path))
-
- # some magic
- time.sleep(0.3)
-
- conn.exec_command("rm -rf {0}".format(remote_path))
-
- if isinstance(uri_or_conn, basestring):
- conn.close()
-
- logger.debug("Sensors stopped and removed")
-
-
-def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
- connected_config=None):
- with ThreadPoolExecutor(max_workers=32) as executor:
- futures = []
-
- if connected_config is not None:
- assert config is None
- conf_iter = connected_config
- else:
- conf_iter = config.items()
-
- for uri_or_conn, config in conf_iter:
- futures.append(executor.submit(stop_and_remove_sensor,
- uri_or_conn, remote_path))
-
- wait(futures)
diff --git a/sensors/sensors/ps_mem.py b/sensors/sensors/ps_mem.py
deleted file mode 100644
index dc5088b..0000000
--- a/sensors/sensors/ps_mem.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/usr/bin/env python
-
-# Based on ps_mem.py:
-# Licence: LGPLv2
-# Author: P@draigBrady.com
-# Source: http://www.pixelbeat.org/scripts/ps_mem.py
-# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
-
-
-# Note shared is always a subset of rss (trs is not always)
-def get_mem_stats(pid):
- """ Return memory data of pid in format (private, shared) """
-
- fname = '/proc/{0}/{1}'.format(pid, "smaps")
- lines = open(fname).readlines()
-
- shared = 0
- private = 0
- pss = 0
-
- # add 0.5KiB as this avg error due to trunctation
- pss_adjust = 0.5
-
- for line in lines:
- if line.startswith("Shared"):
- shared += int(line.split()[1])
-
- if line.startswith("Private"):
- private += int(line.split()[1])
-
- if line.startswith("Pss"):
- pss += float(line.split()[1]) + pss_adjust
-
- # Note Shared + Private = Rss above
- # The Rss in smaps includes video card mem etc.
-
- if pss != 0:
- shared = int(pss - private)
-
- return (private, shared)
diff --git a/sensors/sensors/psram_sensors.py b/sensors/sensors/psram_sensors.py
deleted file mode 100644
index c82819a..0000000
--- a/sensors/sensors/psram_sensors.py
+++ /dev/null
@@ -1,37 +0,0 @@
-from ps_mem import get_mem_stats
-
-from discover import provides
-from utils import SensorInfo, get_pid_name, get_pid_list
-
-@provides("perprocess-ram")
-def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
- results = {}
- pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
- print pid_list
- for pid in pid_list:
- try:
- dev_name = get_pid_name(pid)
-
- private, shared = get_mem_stats(pid)
- total = private + shared
- sys_total = get_ram_size()
- usage = float(total) / float(sys_total)
-
- sensor_name = "{0}.{1}".format(dev_name, pid)
-
- results[sensor_name + ".private_mem"] = SensorInfo(private, False)
- results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
- results[sensor_name + ".used_mem"] = SensorInfo(total, False)
- name = sensor_name + ".mem_usage_percent"
- results[name] = SensorInfo(usage * 100, False)
- except IOError:
- # permission denied or proc die
- continue
- return results
-
-
-def get_ram_size():
- """ Return RAM size in Kb"""
- with open("/proc/meminfo") as proc:
- mem_total = proc.readline().split()
- return mem_total[1]
diff --git a/storage_api.py b/storage_api.py
deleted file mode 100644
index d02d0cd..0000000
--- a/storage_api.py
+++ /dev/null
@@ -1,116 +0,0 @@
-import json
-import math
-from config import TEST_PATH
-from flask import url_for
-import os
-
-
-class Measurement(object):
- def __init__(self):
- self.build = ""
- self.build_type = 0 # GA/Master/Other
- self.md5 = ""
- self.results = {
- "": (float, float)
- }
-
- def __str__(self):
- return self.build + " " + self.build_type + " " + \
- self.md5 + " " + str(self.results)
-
-
-def prepare_build_data(build):
- for item in build.items():
- if type(item[1]) is list:
- m = mean(item[1])
- s = stdev(item[1])
- build[item[0]] = [m, s]
-
-
-def mean(l):
- n = len(l)
-
- return sum(l) / n
-
-
-def stdev(l):
- m = mean(l)
- return math.sqrt(sum(map(lambda x: (x - m) ** 2, l)))
-
-
-def load_test(test_name):
- test_name += '.json'
-
- with open(TEST_PATH + "/" + test_name, 'rt') as f:
- raw = f.read()
-
- if raw != "":
- test = json.loads(raw)
- else:
- test = []
- import time
- creation_time = os.path.getmtime(TEST_PATH + "/" + test_name)
-
- for t in test:
- t['date'] = time.ctime(creation_time)
-
- return test
-
-
-def collect_tests():
- result = []
-
- for file in os.listdir(TEST_PATH):
- if file.endswith(".json"):
- result.append(file.split('.')[0])
-
- return result
-
-
-def collect_builds():
- builds = []
- build_set = set()
- tests = collect_tests()
-
- for t in tests:
- test = load_test(t)
-
- for build in test:
- if build["type"] not in build_set:
- build_set.add(build["type"])
- builds.append(build)
-
- for build in builds:
- prepare_build_data(build)
-
- return builds
-
-
-def builds_list():
- data = []
-
- for build in collect_builds():
- d = {}
- d["type"] = build['type']
- d["url"] = url_for("render_test", test_name=build['name'])
- d["date"] = build['date']
- d["name"] = build['name']
- data.append(d)
-
- return data
-
-
-def create_measurement(build):
- m = Measurement()
- m.build = build.pop("build_id")
- m.build_type = build.pop("type")
- m.md5 = build.pop("iso_md5")
- m.date = build.pop("date")
- m.date = build.pop("name")
- m.results = {k: v for k, v in build.items()}
-
- return m
-
-
-
-collect_builds()
\ No newline at end of file
diff --git "a/tasks/io_task_randread_4kb_1\321\201.cfg" "b/tasks/io_task_randread_4kb_1\321\201.cfg"
deleted file mode 100644
index a1d3745..0000000
--- "a/tasks/io_task_randread_4kb_1\321\201.cfg"
+++ /dev/null
@@ -1,12 +0,0 @@
-[writetest]
-blocksize=4k
-filename=file.bin
-rw=randread
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=1
diff --git "a/tasks/io_task_randread_4kb_4\321\201.cfg" "b/tasks/io_task_randread_4kb_4\321\201.cfg"
deleted file mode 100644
index f983d16..0000000
--- "a/tasks/io_task_randread_4kb_4\321\201.cfg"
+++ /dev/null
@@ -1,12 +0,0 @@
-[writetest]
-blocksize=4k
-filename=file.bin
-rw=randread
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=4
diff --git "a/tasks/io_task_randread_4kb_8\321\201.cfg" "b/tasks/io_task_randread_4kb_8\321\201.cfg"
deleted file mode 100644
index a1d3745..0000000
--- "a/tasks/io_task_randread_4kb_8\321\201.cfg"
+++ /dev/null
@@ -1,12 +0,0 @@
-[writetest]
-blocksize=4k
-filename=file.bin
-rw=randread
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=1
diff --git "a/tasks/io_task_randwrite_4kb_1\321\201.cfg" "b/tasks/io_task_randwrite_4kb_1\321\201.cfg"
deleted file mode 100644
index 771ec90..0000000
--- "a/tasks/io_task_randwrite_4kb_1\321\201.cfg"
+++ /dev/null
@@ -1,12 +0,0 @@
-[writetest]
-blocksize=4k
-filename=file.bin
-rw=randwrite
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=1Gb
-runtime=5
-timeout=5
-numjobs=1
diff --git "a/tasks/io_task_randwrite_4kb_4\321\201.cfg" "b/tasks/io_task_randwrite_4kb_4\321\201.cfg"
deleted file mode 100644
index 4779954..0000000
--- "a/tasks/io_task_randwrite_4kb_4\321\201.cfg"
+++ /dev/null
@@ -1,12 +0,0 @@
-[writetest]
-blocksize=4k
-filename=file.bin
-rw=randwrite
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=4
diff --git "a/tasks/io_task_randwrite_4kb_8\321\201.cfg" "b/tasks/io_task_randwrite_4kb_8\321\201.cfg"
deleted file mode 100644
index c019ef4..0000000
--- "a/tasks/io_task_randwrite_4kb_8\321\201.cfg"
+++ /dev/null
@@ -1,12 +0,0 @@
-[writetest]
-blocksize=4k
-filename=file.bin
-rw=randwrite
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=8
diff --git a/tasks/io_task_read_2mb.cfg b/tasks/io_task_read_2mb.cfg
deleted file mode 100644
index 9a5d2d8..0000000
--- a/tasks/io_task_read_2mb.cfg
+++ /dev/null
@@ -1,13 +0,0 @@
-[writetest]
-blocksize=2m
-filename=file.bin
-rw=read
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=1
-
diff --git a/tasks/io_task_write_2mb.cfg b/tasks/io_task_write_2mb.cfg
deleted file mode 100644
index 40c9bef..0000000
--- a/tasks/io_task_write_2mb.cfg
+++ /dev/null
@@ -1,13 +0,0 @@
-[writetest]
-blocksize=2m
-filename=file.bin
-rw=write
-direct=1
-buffered=0
-ioengine=libaio
-iodepth=1
-size=10Gb
-runtime=5
-timeout=5
-numjobs=1
-
diff --git a/fake_run_test.py b/tests/fake_run_test.py
similarity index 100%
rename from fake_run_test.py
rename to tests/fake_run_test.py
diff --git a/usb_hdd.yaml b/usb_hdd.yaml
new file mode 100644
index 0000000..59e1259
--- /dev/null
+++ b/usb_hdd.yaml
@@ -0,0 +1,24 @@
+explicit_nodes:
+ "ssh://koder@192.168.0.108::/home/koder/.ssh/id_rsa": testnode
+ # local: testnode
+
+internal:
+ var_dir_root: /tmp/perf_tests
+
+sensors:
+ # receiver_uri: udp://172.18.217.10:5699
+ receiver_uri: "udp://192.168.0.108:5699"
+ # receiver_uri: "udp://{ip}:5699"
+ roles_mapping:
+ ceph-osd: block-io
+ cinder: block-io, system-cpu
+ testnode: system-cpu, block-io
+
+tests:
+ - io:
+ cfg: wally/suits/io/io_scenario_hdd.cfg
+ params:
+ FILENAME: /media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+ NUM_ROUNDS: 7
+logging:
+ extra_logs: 1
\ No newline at end of file
diff --git a/vEnv-3-2.yaml b/vEnv-3-2.yaml
new file mode 100644
index 0000000..40ba677
--- /dev/null
+++ b/vEnv-3-2.yaml
@@ -0,0 +1,57 @@
+clouds:
+ fuel:
+ url: http://172.16.52.108:8000/
+ creds: admin:admin@admin
+ ssh_creds: root:test37
+ openstack_env: fuel
+
+discover: fuel
+
+# explicit_nodes:
+# "ssh://root@172.16.52.112:3022:/home/koder/.ssh/id_rsa": testnode
+
+internal:
+ var_dir_root: /tmp/perf_tests
+
+sensors:
+ # receiver_uri: udp://172.18.217.10:5699
+ receiver_uri: "udp://{ip}:5699"
+ roles_mapping:
+ ceph-osd: block-io
+ cinder: block-io, system-cpu
+ testnode: system-cpu, block-io
+
+tests:
+ - start_test_nodes:
+ creds: clouds
+ vm_params:
+ count: x1
+ image:
+ name: disk_io_perf
+ url: https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+ flavor:
+ name: disk_io_perf.1024
+ hdd_size: 50
+ ram_size: 1024
+ cpu: 1
+
+ keypair_name: disk_io_perf
+ network_zone_name: net04
+ flt_ip_pool: net04_ext
+ private_key_path: disk_io_perf.pem
+ creds: "ssh://ubuntu@{ip}::{private_key_path}"
+ name_templ: wally-{group}-{id}
+ scheduler_group_name: wally-aa-{group}-{id}
+ security_group: disk_io_perf
+
+ tests:
+ - io:
+ # cfg: tests/io_scenario_hdd.cfg
+ cfg: scripts/fio_tests_configs/io_task_test.cfg
+ params:
+ FILENAME: /opt/xxx.bin
+ NUM_ROUNDS: 7
+
+logging:
+ extra_logs: 1
diff --git a/tests/__init__.py b/wally/__init__.py
similarity index 100%
rename from tests/__init__.py
rename to wally/__init__.py
diff --git a/wally/__main__.py b/wally/__main__.py
new file mode 100644
index 0000000..97011d9
--- /dev/null
+++ b/wally/__main__.py
@@ -0,0 +1,6 @@
+import sys
+from .run_test import main
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv))
diff --git a/assumptions_check.py b/wally/assumptions_check.py
similarity index 96%
rename from assumptions_check.py
rename to wally/assumptions_check.py
index fdea7e1..41ae2e0 100644
--- a/assumptions_check.py
+++ b/wally/assumptions_check.py
@@ -1,12 +1,13 @@
import sys
-import numpy as np
import texttable as TT
+
+import numpy as np
import matplotlib.pyplot as plt
from numpy.polynomial.chebyshev import chebfit, chebval
-from io_results_loader import load_data, filter_data
-from statistic import approximate_line, difference, round_deviation
+from .io_results_loader import load_data, filter_data
+from .statistic import approximate_line, difference
def linearity_plot(data, types, vals=None):
@@ -57,7 +58,6 @@
plt.title("Linearity test by %i dots" % (len(vals)))
-
def linearity_table(data, types, vals):
""" create table by pyplot with diferences
between original and approximated
@@ -80,7 +80,6 @@
ax.append(sz / 1024.0)
ay.append(iotime_ms)
-
ynew = approximate_line(ax, ay, x, True)
dif, _, _ = difference(y, ynew)
@@ -99,9 +98,6 @@
for row in table_data:
tab.add_row(row)
- print tp
- print tab.draw()
-
# uncomment to get table in pretty pictures :)
# colLabels = ("BlockSize, kB", "Absolute difference (ms)", "Relative difference (%)")
# fig = plt.figure()
diff --git a/chart/charts.py b/wally/charts.py
similarity index 96%
rename from chart/charts.py
rename to wally/charts.py
index d1c7142..a168f9d 100644
--- a/chart/charts.py
+++ b/wally/charts.py
@@ -1,14 +1,19 @@
import os
+import sys
import hashlib
-import threading
-from GChartWrapper import VerticalBarGroup
from GChartWrapper import Line
from GChartWrapper import constants
+from GChartWrapper import VerticalBarGroup
from config import cfg_dict
+# Patch MARKER constant
+constants.MARKERS += 'E'
+sys.modules['GChartWrapper.GChart'].MARKERS += 'E'
+
+
COLORS = ["1569C7", "81D8D0", "307D7E", "5CB3FF", "0040FF", "81DAF5"]
constants.MARKERS += 'E' # append E marker to available markers
@@ -59,12 +64,12 @@
bar.title(title)
dataset = bars_data + bars_dev_top + bars_dev_bottom + \
- [l[0] for l in lines]
+ [l[0] for l in lines]
bar.dataset(dataset, series=len(bars_data))
bar.axes.type('xyy')
bar.axes.label(2, None, label_x)
-
+
if scale_x:
bar.axes.label(0, *scale_x)
@@ -73,7 +78,6 @@
bar.axes.style(1, 'N*s*')
bar.axes.style(2, '000000', '13')
-
bar.scale(*[0, max_value] * 3)
bar.bar('r', '.1', '1')
diff --git a/wally/config.py b/wally/config.py
new file mode 100644
index 0000000..03b7ac9
--- /dev/null
+++ b/wally/config.py
@@ -0,0 +1,129 @@
+import os
+import uuid
+import logging
+import functools
+
+import yaml
+
+try:
+ from petname import Generate as pet_generate
+except ImportError:
+ def pet_generate(x, y):
+ return str(uuid.uuid4())
+
+
+cfg_dict = {}
+
+
+def mkdirs_if_unxists(path):
+ if not os.path.exists(path):
+ os.makedirs(path)
+
+
+def load_config(file_name, explicit_folder=None):
+ first_load = len(cfg_dict) == 0
+ cfg_dict.update(yaml.load(open(file_name).read()))
+
+ if first_load:
+ var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
+
+ run_uuid = None
+ if explicit_folder is None:
+ for i in range(10):
+ run_uuid = pet_generate(2, "_")
+ results_dir = os.path.join(var_dir, run_uuid)
+ if not os.path.exists(results_dir):
+ break
+ else:
+ run_uuid = str(uuid.uuid4())
+ results_dir = os.path.join(var_dir, run_uuid)
+ else:
+ results_dir = explicit_folder
+
+ cfg_dict['var_dir'] = results_dir
+ cfg_dict['run_uuid'] = run_uuid.replace('_', '-')
+ mkdirs_if_unxists(cfg_dict['var_dir'])
+
+ in_var_dir = functools.partial(os.path.join, cfg_dict['var_dir'])
+
+ cfg_dict['charts_img_path'] = in_var_dir('charts')
+ mkdirs_if_unxists(cfg_dict['charts_img_path'])
+
+ cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
+ cfg_dict['html_report_file'] = in_var_dir('report.html')
+ cfg_dict['text_report_file'] = in_var_dir('report.txt')
+ cfg_dict['log_file'] = in_var_dir('log.txt')
+ cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+
+ cfg_dict['test_log_directory'] = in_var_dir('test_logs')
+ mkdirs_if_unxists(cfg_dict['test_log_directory'])
+
+
+def color_me(color):
+ RESET_SEQ = "\033[0m"
+ COLOR_SEQ = "\033[1;%dm"
+
+ color_seq = COLOR_SEQ % (30 + color)
+
+ def closure(msg):
+ return color_seq + msg + RESET_SEQ
+ return closure
+
+
+class ColoredFormatter(logging.Formatter):
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+ colors = {
+ 'WARNING': color_me(YELLOW),
+ 'DEBUG': color_me(BLUE),
+ 'CRITICAL': color_me(YELLOW),
+ 'ERROR': color_me(RED)
+ }
+
+ def __init__(self, msg, use_color=True, datefmt=None):
+ logging.Formatter.__init__(self, msg, datefmt=datefmt)
+ self.use_color = use_color
+
+ def format(self, record):
+ orig = record.__dict__
+ record.__dict__ = record.__dict__.copy()
+ levelname = record.levelname
+
+ prn_name = levelname + ' ' * (8 - len(levelname))
+ if levelname in self.colors:
+ record.levelname = self.colors[levelname](prn_name)
+ else:
+ record.levelname = prn_name
+
+ res = super(ColoredFormatter, self).format(record)
+
+ # restore record, as it will be used by other formatters
+ record.__dict__ = orig
+ return res
+
+
+def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+ logger = logging.getLogger('wally')
+ logger.setLevel(logging.DEBUG)
+ sh = logging.StreamHandler()
+ sh.setLevel(def_level)
+
+ log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+ colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
+
+ sh.setFormatter(colored_formatter)
+ logger.addHandler(sh)
+
+ logger_api = logging.getLogger("wally.fuel_api")
+
+ if log_fname is not None:
+ fh = logging.FileHandler(log_fname)
+ log_format = '%(asctime)s - %(levelname)8s - %(name)s - %(message)s'
+ formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
+ fh.setFormatter(formatter)
+ fh.setLevel(logging.DEBUG)
+ logger.addHandler(fh)
+ logger_api.addHandler(fh)
+
+ logger_api.addHandler(sh)
+ logger_api.setLevel(logging.WARNING)
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
new file mode 100644
index 0000000..b02809c
--- /dev/null
+++ b/wally/discover/__init__.py
@@ -0,0 +1,5 @@
+"this package contains node discovery code"
+from .discover import discover, undiscover
+from .node import Node
+
+__all__ = ["discover", "Node", "undiscover"]
diff --git a/nodes/ceph.py b/wally/discover/ceph.py
similarity index 96%
rename from nodes/ceph.py
rename to wally/discover/ceph.py
index 793c021..70a6edf 100644
--- a/nodes/ceph.py
+++ b/wally/discover/ceph.py
@@ -4,8 +4,8 @@
import subprocess
-from node import Node
-from disk_perf_test_tool.ssh_utils import connect
+from .node import Node
+from wally.ssh_utils import connect
logger = logging.getLogger("io-perf-tool")
diff --git a/nodes/discover.py b/wally/discover/discover.py
similarity index 63%
rename from nodes/discover.py
rename to wally/discover/discover.py
index 6c02fe6..8c78387 100644
--- a/nodes/discover.py
+++ b/wally/discover/discover.py
@@ -1,16 +1,17 @@
import logging
-import urlparse
-import ceph
-import openstack
-from utils import parse_creds
-from scripts import connector
-
-logger = logging.getLogger("io-perf-tool")
+from . import ceph
+from . import fuel
+from . import openstack
+from wally.utils import parse_creds
-def discover(ctx, discover, clusters_info):
+logger = logging.getLogger("wally.discover")
+
+
+def discover(ctx, discover, clusters_info, var_dir):
nodes_to_run = []
+ clean_data = None
for cluster in discover:
if cluster == "openstack":
cluster_info = clusters_info["openstack"]
@@ -37,25 +38,9 @@
nodes_to_run.extend(os_nodes)
elif cluster == "fuel":
- cluster_info = clusters_info['fuel']
- cluster_name = cluster_info['openstack_env']
- url = cluster_info['url']
- creds = cluster_info['creds']
- ssh_creds = cluster_info['ssh_creds']
- # if user:password format us used
- if not ssh_creds.startswith("ssh://"):
- ip_port = urlparse.urlparse(url).netloc
-
- if ':' in ip_port:
- ip = ip_port.split(":")[0]
- else:
- ip = ip_port
-
- ssh_creds = "ssh://{0}@{1}".format(ssh_creds, ip)
-
- dfunc = connector.discover_fuel_nodes
- nodes, clean_data, openrc_dict = dfunc(url, creds, cluster_name)
+ res = fuel.discover_fuel_nodes(clusters_info['fuel'], var_dir)
+ nodes, clean_data, openrc_dict = res
ctx.fuel_openstack_creds = {'name': openrc_dict['username'],
'passwd': openrc_dict['password'],
@@ -71,4 +56,9 @@
msg_templ = "Unknown cluster type in 'discover' parameter: {0!r}"
raise ValueError(msg_templ.format(cluster))
- return nodes_to_run
+ return nodes_to_run, clean_data
+
+
+def undiscover(clean_data):
+ if clean_data is not None:
+ fuel.clean_fuel_port_forwarding(clean_data)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
new file mode 100644
index 0000000..6e76188
--- /dev/null
+++ b/wally/discover/fuel.py
@@ -0,0 +1,136 @@
+import os
+import re
+import sys
+import socket
+import logging
+from urlparse import urlparse
+
+import yaml
+from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
+ reflect_cluster, FuelInfo)
+from wally.utils import parse_creds
+from wally.ssh_utils import run_over_ssh, connect
+
+from .node import Node
+
+
+logger = logging.getLogger("wally.discover")
+BASE_PF_PORT = 33467
+
+
+def discover_fuel_nodes(fuel_data, var_dir):
+ username, tenant_name, password = parse_creds(fuel_data['creds'])
+ creds = {"username": username,
+ "tenant_name": tenant_name,
+ "password": password}
+
+ conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
+
+ cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
+ cluster = reflect_cluster(conn, cluster_id)
+ version = FuelInfo(conn).get_version()
+
+ fuel_nodes = list(cluster.get_nodes())
+
+ logger.debug("Found FUEL {0}".format("".join(map(str, version))))
+
+ network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
+
+ ssh_creds = fuel_data['ssh_creds']
+
+ fuel_host = urlparse(fuel_data['url']).hostname
+ fuel_ip = socket.gethostbyname(fuel_host)
+ ssh_conn = connect("{0}@@{1}".format(ssh_creds, fuel_host))
+
+ fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
+
+ # TODO: keep ssh key in memory
+ # http://stackoverflow.com/questions/11994139/how-to-include-the-private-key-in-paramiko-after-fetching-from-string
+ fuel_key_file = os.path.join(var_dir, "fuel_master_node_id_rsa")
+ download_master_key(ssh_conn, fuel_key_file)
+
+ nodes = []
+ ports = range(BASE_PF_PORT, BASE_PF_PORT + len(fuel_nodes))
+ ips_ports = []
+
+ for fuel_node, port in zip(fuel_nodes, ports):
+ ip = fuel_node.get_ip(network)
+ forward_ssh_port(ssh_conn, fuel_ext_iface, port, ip)
+
+ conn_url = "ssh://root@{0}:{1}:{2}".format(fuel_host,
+ port,
+ fuel_key_file)
+ nodes.append(Node(conn_url, fuel_node['roles']))
+ ips_ports.append((ip, port))
+
+ logger.debug("Found %s fuel nodes for env %r" %
+ (len(nodes), fuel_data['openstack_env']))
+
+ return ([],
+ (ssh_conn, fuel_ext_iface, ips_ports),
+ cluster.get_openrc())
+
+ return (nodes,
+ (ssh_conn, fuel_ext_iface, ips_ports),
+ cluster.get_openrc())
+
+
+def download_master_key(conn, dest):
+ # download master key
+ sftp = conn.open_sftp()
+ sftp.get('/root/.ssh/id_rsa', dest)
+ os.chmod(dest, 0o400)
+ sftp.close()
+
+ logger.debug("Fuel master key stored in {0}".format(dest))
+
+
+def get_external_interface(conn, ip):
+ data = run_over_ssh(conn, "ip a", node='fuel-master')
+ curr_iface = None
+ for line in data.split("\n"):
+
+ match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
+ if match1 is not None:
+ curr_iface = match1.group('name')
+
+ match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
+ if match2 is not None:
+ if match2.group('ip') == ip:
+ assert curr_iface is not None
+ return curr_iface
+ raise KeyError("Can't found interface for ip {0}".format(ip))
+
+
+def forward_ssh_port(conn, iface, new_port, ip, clean=False):
+ mode = "-D" if clean is True else "-A"
+ cmd = "iptables -t nat {mode} PREROUTING -p tcp " + \
+ "-i {iface} --dport {port} -j DNAT --to {ip}:22"
+ run_over_ssh(conn,
+ cmd.format(iface=iface, port=new_port, ip=ip, mode=mode),
+ node='fuel-master')
+
+
+def clean_fuel_port_forwarding(clean_data):
+ conn, iface, ips_ports = clean_data
+ for ip, port in ips_ports:
+ forward_ssh_port(conn, iface, port, ip, clean=True)
+
+
+def main(argv):
+ fuel_data = yaml.load(open(sys.argv[1]).read())['clouds']['fuel']
+ nodes, to_clean, openrc = discover_fuel_nodes(fuel_data, '/tmp')
+
+ print nodes
+ print openrc
+ print "Ready to test"
+
+ sys.stdin.readline()
+
+ clean_fuel_port_forwarding(to_clean)
+
+ return 0
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/nodes/node.py b/wally/discover/node.py
similarity index 100%
rename from nodes/node.py
rename to wally/discover/node.py
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
new file mode 100644
index 0000000..32b4629
--- /dev/null
+++ b/wally/discover/openstack.py
@@ -0,0 +1,70 @@
+import socket
+import logging
+
+
+from novaclient.client import Client
+
+from .node import Node
+from wally.utils import parse_creds
+
+
+logger = logging.getLogger("io-perf-tool.discover")
+
+
+def get_floating_ip(vm):
+ addrs = vm.addresses
+ for net_name, ifaces in addrs.items():
+ for iface in ifaces:
+ if iface.get('OS-EXT-IPS:type') == "floating":
+ return iface['addr']
+
+
+def discover_vms(client, search_opts):
+ user, password, key = parse_creds(search_opts.pop('auth'))
+
+ servers = client.servers.list(search_opts=search_opts)
+ logger.debug("Found %s openstack vms" % len(servers))
+ return [Node(get_floating_ip(server), ["test_vm"], username=user,
+ password=password, key_path=key)
+ for server in servers if get_floating_ip(server)]
+
+
+def discover_services(client, opts):
+ user, password, key = parse_creds(opts.pop('auth'))
+
+ services = []
+ if opts['service'] == "all":
+ services = client.services.list()
+ else:
+ if isinstance(opts['service'], basestring):
+ opts['service'] = [opts['service']]
+
+ for s in opts['service']:
+ services.extend(client.services.list(binary=s))
+
+ host_services_mapping = {}
+
+ for service in services:
+ ip = socket.gethostbyname(service.host)
+ host_services_mapping[ip].append(service.binary)
+
+ logger.debug("Found %s openstack service nodes" %
+ len(host_services_mapping))
+ return [Node(host, services, username=user,
+ password=password, key_path=key) for host, services in
+ host_services_mapping.items()]
+
+
+def discover_openstack_nodes(conn_details, conf):
+ """Discover vms running in openstack
+ :param conn_details - dict with openstack connection details -
+ auth_url, api_key (password), username
+ """
+ client = Client(version='1.1', **conn_details)
+ nodes = []
+ if conf.get('discover'):
+ services_to_discover = conf['discover'].get('nodes')
+ if services_to_discover:
+ nodes.extend(discover_services(client, services_to_discover))
+
+ return nodes
diff --git a/fuel_rest_api.py b/wally/fuel_rest_api.py
similarity index 97%
rename from fuel_rest_api.py
rename to wally/fuel_rest_api.py
index 23f7289..499510d 100644
--- a/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -3,9 +3,8 @@
import time
import logging
import urllib2
-
-from functools import partial, wraps
import urlparse
+from functools import partial, wraps
import netaddr
@@ -13,7 +12,7 @@
from keystoneclient import exceptions
-logger = logging.getLogger("io-perf-tool.fuel_api")
+logger = logging.getLogger("wally.fuel_api")
class Urllib2HTTP(object):
@@ -195,6 +194,7 @@
get_nodes = GET('api/nodes')
get_clusters = GET('api/clusters')
get_cluster = GET('api/clusters/{id}')
+ get_info = GET('api/releases')
@property
def nodes(self):
@@ -214,6 +214,12 @@
return [Cluster(self.__connection__, **cluster) for cluster
in self.get_clusters()]
+ def get_version(self):
+ for info in self.get_info():
+ vers = info['version'].split("-")[1].split('.')
+ return map(int, vers)
+ raise ValueError("No version found")
+
class Node(RestObj):
"""Represents node in Fuel"""
@@ -346,7 +352,7 @@
if self.nodes.controller:
contr = self.nodes.controller[0]
creds['os_auth_url'] = "http://%s:5000/v2.0" \
- % contr.get_ip(network="public")
+ % contr.get_ip(network="public")
else:
creds['os_auth_url'] = ""
return creds
diff --git a/keystone.py b/wally/keystone.py
similarity index 100%
rename from keystone.py
rename to wally/keystone.py
diff --git a/meta_info.py b/wally/meta_info.py
similarity index 96%
rename from meta_info.py
rename to wally/meta_info.py
index 1f95b34..127612b 100644
--- a/meta_info.py
+++ b/wally/meta_info.py
@@ -16,7 +16,9 @@
for disk in node['disks']:
lab_data['total_disk'] += disk['size']
- to_gb = lambda x: x / (1024 ** 3)
+ def to_gb(x):
+ return x / (1024 ** 3)
+
lab_data['total_memory'] = format(to_gb(lab_data['total_memory']), ',d')
lab_data['total_disk'] = format(to_gb(lab_data['total_disk']), ',d')
return lab_data
diff --git a/pretty_yaml.py b/wally/pretty_yaml.py
similarity index 96%
rename from pretty_yaml.py
rename to wally/pretty_yaml.py
index 44d4e49..4fc4a49 100644
--- a/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,3 +1,7 @@
+__doc__ = "functions for make pretty yaml files"
+__all__ = ['dumps']
+
+
def dumps_simple(val):
bad_symbols = set(" \r\t\n,':")
diff --git a/wally/report.py b/wally/report.py
new file mode 100644
index 0000000..d2f2d96
--- /dev/null
+++ b/wally/report.py
@@ -0,0 +1,236 @@
+import os
+import sys
+
+from wally import charts
+from wally.statistic import med_dev
+from wally.utils import parse_creds
+from wally.suits.io.results_loader import filter_data
+from wally.meta_info import total_lab_info, collect_lab_data
+
+
+# from collections import OrderedDict
+# from wally.suits.io import formatter
+# def pgbench_chart_data(results):
+# """
+# Format pgbench results for chart
+# """
+# data = {}
+# charts_url = []
+
+# formatted_res = formatters.format_pgbench_stat(results)
+# for key, value in formatted_res.items():
+# num_cl, num_tr = key.split(' ')
+# data.setdefault(num_cl, {}).setdefault(build, {})
+# data[keys[z]][build][
+# ' '.join(keys)] = value
+
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
+
+# scale_x = []
+
+# for build_id, build_results in value.items():
+# vals = []
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: t[0]))
+# scale_x = ordered_build_results.keys()
+# for key in scale_x:
+# res = build_results.get(key)
+# if res:
+# vals.append(res)
+# if vals:
+# dataset.append(vals)
+# legend.append(build_id)
+
+# if dataset:
+# charts_url.append(str(charts.render_vertical_bar
+# (title, legend, dataset, scale_x=scale_x)))
+# return charts_url
+
+# def build_lines_chart(results, z=0):
+# data = {}
+# charts_url = []
+
+# for build, res in results:
+# formatted_res = formatters.get_formatter(build)(res)
+# for key, value in formatted_res.items():
+# keys = key.split(' ')
+# data.setdefault(key[z], {})
+# data[key[z]].setdefault(build, {})[keys[1]] = value
+
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
+# scale_x = []
+# for build_id, build_results in value.items():
+# legend.append(build_id)
+
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: ssize_to_b(t[0])))
+
+# if not scale_x:
+# scale_x = ordered_build_results.keys()
+# dataset.append(zip(*ordered_build_results.values())[0])
+
+# chart = charts.render_lines(title, legend, dataset, scale_x)
+# charts_url.append(str(chart))
+
+# return charts_url
+
+# def build_vertical_bar(results, z=0):
+# data = {}
+# charts_url = []
+# for build, res in results:
+# formatted_res = formatter.get_formatter(build)(res)
+# for key, value in formatted_res.items():
+# keys = key.split(' ')
+# data.setdefault(keys[z], {}).setdefault(build, {})
+# data[keys[z]][build][
+# ' '.join(keys)] = value
+
+# for name, value in data.items():
+# title = name
+# legend = []
+# dataset = []
+
+# scale_x = []
+
+# for build_id, build_results in value.items():
+# vals = []
+# OD = OrderedDict
+# ordered_build_results = OD(sorted(build_results.items(),
+# key=lambda t: t[0]))
+# scale_x = ordered_build_results.keys()
+# for key in scale_x:
+# res = build_results.get(key)
+# if res:
+# vals.append(res)
+# if vals:
+# dataset.append(vals)
+# legend.append(build_id)
+
+# if dataset:
+# charts_url.append(str(charts.render_vertical_bar
+# (title, legend, dataset, scale_x=scale_x)))
+# return charts_url
+
+
+def render_html(charts_urls, dest, lab_description, info):
+ templ = open("report.html", 'r').read()
+ body = "<a href='#lab_desc'>Lab description</a>" \
+ "<ol>{0}</ol>" \
+ "<div>{1}</div>" \
+ '<a name="lab_desc"></a>' \
+ "<div><ul>{2}</ul></div>"
+ table = "<table><tr><td>{0}</td><td>{1}</td></tr>" \
+ "<tr><td>{2}</td><td>{3}</td></tr></table>"
+ ul = []
+ ol = []
+ li = '<li>{0} : {1}</li>'
+
+ for elem in info:
+ ol.append(li.format(elem.keys(), elem.values()))
+
+ for key in lab_description:
+ value = lab_description[key]
+ ul.append("<li>{0} : {1}</li>".format(key, value))
+
+ charts_urls = ['<img src="{0}">'.format(url) for url in charts_urls]
+
+ body = body.format('\n'.join(ol),
+ table.format(*charts_urls),
+ '\n'.join(ul))
+
+ open(dest, 'w').write(templ % {'body': body})
+
+
+def io_chart(title, concurence, latv, iops_or_bw, iops_or_bw_dev,
+ legend, fname):
+ bar_data, bar_dev = iops_or_bw, iops_or_bw_dev
+ legend = [legend]
+
+ iops_or_bw_per_vm = []
+ for i in range(len(concurence)):
+ iops_or_bw_per_vm.append(iops_or_bw[i] / concurence[i])
+
+ bar_dev_bottom = []
+ bar_dev_top = []
+ for i in range(len(bar_data)):
+ bar_dev_top.append(bar_data[i] + bar_dev[i])
+ bar_dev_bottom.append(bar_data[i] - bar_dev[i])
+
+ latv = [lat / 1000 for lat in latv]
+ ch = charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
+ [bar_dev_bottom], file_name=fname,
+ scale_x=concurence,
+ lines=[
+ (latv, "msec", "rr", "lat"),
+ (iops_or_bw_per_vm, None, None,
+ "bw_per_vm")
+ ])
+ return str(ch)
+
+
+def make_io_report(results, path, lab_url=None, creds=None):
+ if lab_url is not None:
+ username, password, tenant_name = parse_creds(creds)
+ creds = {'username': username,
+ 'password': password,
+ "tenant_name": tenant_name}
+ data = collect_lab_data(lab_url, creds)
+ lab_info = total_lab_info(data)
+ else:
+ lab_info = ""
+
+ for suite_type, test_suite_data in results:
+ if suite_type != 'io' or test_suite_data is None:
+ continue
+
+ io_test_suite_res = test_suite_data['res']
+
+ charts_url = []
+ info = []
+
+ name_filters = [
+ ('hdd_test_rrd4k', ('concurence', 'lat', 'iops'), 'rand_read_4k'),
+ ('hdd_test_swd1m', ('concurence', 'lat', 'bw'), 'seq_write_1m'),
+ ('hdd_test_srd1m', ('concurence', 'lat', 'bw'), 'seq_read_1m'),
+ ('hdd_test_rws4k', ('concurence', 'lat', 'bw'), 'rand_write_1m')
+ ]
+
+ for name_filter, fields, fname in name_filters:
+ th_filter = filter_data(name_filter, fields)
+
+ data = sorted(th_filter(io_test_suite_res.values()))
+ if len(data) == 0:
+ continue
+
+ concurence, latv, iops_or_bw_v = zip(*data)
+ iops_or_bw_v, iops_or_bw_dev_v = zip(*map(med_dev, iops_or_bw_v))
+ latv, _ = zip(*map(med_dev, latv))
+
+ url = io_chart(name_filter, concurence, latv, iops_or_bw_v,
+ iops_or_bw_dev_v,
+ fields[2], fname)
+ info.append(dict(zip(fields, (concurence, latv, iops_or_bw_v))))
+ charts_url.append(url)
+
+ if len(charts_url) != 0:
+ render_html(charts_url, path, lab_info, info)
+
+
+def main(args):
+ make_io_report(results=[('a', 'b')],
+ path=os.path.dirname(args[0]),
+ lab_url='http://172.16.52.112:8000',
+ creds='admin:admin@admin')
+ return 0
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv))
diff --git a/rest_api.py b/wally/rest_api.py
similarity index 60%
rename from rest_api.py
rename to wally/rest_api.py
index 619c13b..fc9bd05 100644
--- a/rest_api.py
+++ b/wally/rest_api.py
@@ -5,9 +5,6 @@
def add_test(test_name, test_data, url):
if not url.endswith("/"):
url += '/api/tests/' + test_name
-
- import pdb
- pdb.set_trace()
requests.post(url=url, data=json.dumps(test_data))
@@ -26,18 +23,3 @@
result = requests.get(url=url)
return json.loads(result.content)
-
-
-if __name__ == '__main__':
- url = "http://0.0.0.0:5000/api/tests"
- test = get_test("GA", url=url)
- print test
-
- tests = get_all_tests(url=url)
- print tests
-
- # test["type"] = "some build name"
- # add_test("bla_bla", [test], url=url)
-
- tests = get_all_tests(url=url)
- print len(tests)
diff --git a/run_test.py b/wally/run_test.py
similarity index 75%
rename from run_test.py
rename to wally/run_test.py
index a3a1941..7899bae 100755
--- a/run_test.py
+++ b/wally/run_test.py
@@ -1,3 +1,5 @@
+from __future__ import print_function
+
import os
import sys
import time
@@ -11,89 +13,17 @@
import yaml
from concurrent.futures import ThreadPoolExecutor
-import utils
-import report
-import ssh_utils
-import start_vms
-import pretty_yaml
-from nodes import discover
-from nodes.node import Node
-from config import cfg_dict, load_config
-from tests.itest import IOPerfTest, PgBenchTest
-from formatters import format_results_for_console
-from sensors.api import start_monitoring, deploy_and_start_sensors
+from wally import pretty_yaml
+from wally.discover import discover, Node, undiscover
+from wally import utils, report, ssh_utils, start_vms
+from wally.suits.itest import IOPerfTest, PgBenchTest
+from wally.config import cfg_dict, load_config, setup_loggers
+from wally.sensors.api import (start_monitoring,
+ deploy_and_start_sensors,
+ SensorConfig)
-logger = logging.getLogger("io-perf-tool")
-
-
-def color_me(color):
- RESET_SEQ = "\033[0m"
- COLOR_SEQ = "\033[1;%dm"
-
- color_seq = COLOR_SEQ % (30 + color)
-
- def closure(msg):
- return color_seq + msg + RESET_SEQ
- return closure
-
-
-class ColoredFormatter(logging.Formatter):
- BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
-
- colors = {
- 'WARNING': color_me(YELLOW),
- 'DEBUG': color_me(BLUE),
- 'CRITICAL': color_me(YELLOW),
- 'ERROR': color_me(RED)
- }
-
- def __init__(self, msg, use_color=True):
- logging.Formatter.__init__(self, msg)
- self.use_color = use_color
-
- def format(self, record):
- orig = record.__dict__
- record.__dict__ = record.__dict__.copy()
- levelname = record.levelname
-
- prn_name = ' ' * (6 - len(levelname)) + levelname
- if levelname in self.colors:
- record.levelname = self.colors[levelname](prn_name)
- else:
- record.levelname = prn_name
-
- res = super(ColoredFormatter, self).format(record)
- record.__dict__ = orig
- return res
-
-
-def setup_logger(logger, level=logging.DEBUG, log_fname=None):
- logger.setLevel(logging.DEBUG)
- sh = logging.StreamHandler()
- sh.setLevel(level)
-
- log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
- colored_formatter = ColoredFormatter(log_format,
- "%H:%M:%S")
-
- sh.setFormatter(colored_formatter)
- logger.addHandler(sh)
-
- logger_api = logging.getLogger("io-perf-tool.fuel_api")
-
- if log_fname is not None:
- fh = logging.FileHandler(log_fname)
- log_format = '%(asctime)s - %(levelname)6s - %(name)s - %(message)s'
- formatter = logging.Formatter(log_format,
- "%H:%M:%S")
- fh.setFormatter(formatter)
- fh.setLevel(logging.DEBUG)
- logger.addHandler(fh)
- logger_api.addHandler(fh)
-
- logger_api.addHandler(sh)
- logger_api.setLevel(logging.WARNING)
+logger = logging.getLogger("wally")
def format_result(res, formatter):
@@ -112,6 +42,8 @@
self.openstack_nodes_ids = []
self.sensor_cm = None
self.keep_vm = False
+ self.sensors_control_queue = None
+ self.sensor_listen_thread = None
def connect_one(node):
@@ -119,6 +51,7 @@
ssh_pref = "ssh://"
if node.conn_url.startswith(ssh_pref):
url = node.conn_url[len(ssh_pref):]
+ logger.debug("Try connect to " + url)
node.connection = ssh_utils.connect(url)
else:
raise ValueError("Unknown url type {0}".format(node.conn_url))
@@ -134,15 +67,26 @@
logger.info("All nodes connected successfully")
-def save_sensors_data(q, fd):
+def save_sensors_data(q, mon_q, fd):
logger.info("Start receiving sensors data")
- while True:
- val = q.get()
- if val is None:
- q.put([])
- break
- fd.write("\n" + str(time.time()) + " : ")
- fd.write(repr(val))
+ fd.write("\n")
+
+ observed_nodes = set()
+
+ try:
+ while True:
+ val = q.get()
+ if val is None:
+ break
+
+ addr, data = val
+ if addr not in observed_nodes:
+ mon_q.put(addr)
+ observed_nodes.add(addr)
+
+ fd.write("{0!s} : {1!r}\n".format(time.time(), repr(val)))
+ except Exception:
+ logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")
@@ -242,10 +186,20 @@
connect_all(ctx.nodes)
+def make_undiscover_stage(clean_data):
+ def undiscover_stage(cfg, ctx):
+ undiscover(clean_data)
+ return undiscover_stage
+
+
def discover_stage(cfg, ctx):
if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
- ctx.nodes.extend(discover.discover(ctx, discover_objs, cfg['clouds']))
+
+ nodes, clean_data = discover(ctx, discover_objs,
+ cfg['clouds'], cfg['var_dir'])
+ ctx.clear_calls_stack.append(make_undiscover_stage(clean_data))
+ ctx.nodes.extend(nodes)
for url, roles in cfg.get('explicit_nodes', {}).items():
ctx.nodes.append(Node(url, roles.split(",")))
@@ -255,9 +209,9 @@
if 'sensors' not in cfg_dict:
return
- ctx.clear_calls_stack.append(remove_sensors_stage)
cfg = cfg_dict.get('sensors')
- sens_cfg = []
+
+ sensors_configs = []
monitored_nodes = []
for role, sensors_str in cfg["roles_mapping"].items():
@@ -268,12 +222,19 @@
for node in ctx.nodes:
if role in node.roles:
monitored_nodes.append(node)
- sens_cfg.append((node.connection, collect_cfg))
+ sens_cfg = SensorConfig(node.connection,
+ node.get_ip(),
+ collect_cfg)
+ sensors_configs.append(sens_cfg)
+
+ if len(monitored_nodes) == 0:
+ logger.info("Nothing to monitor, no sensors would be installed")
+ return
ctx.receiver_uri = cfg["receiver_uri"]
+ nodes_ips = [node.get_ip() for node in monitored_nodes]
if '{ip}' in ctx.receiver_uri:
- ips = set(utils.get_ip_for_target(node.get_ip())
- for node in monitored_nodes)
+ ips = set(map(utils.get_ip_for_target, nodes_ips))
if len(ips) > 1:
raise ValueError("Can't select external ip for sensors server")
@@ -284,25 +245,53 @@
ext_ip = list(ips)[0]
ctx.receiver_uri = ctx.receiver_uri.format(ip=ext_ip)
- ctx.sensor_cm = start_monitoring(ctx.receiver_uri, None,
- connected_config=sens_cfg)
+ ctx.clear_calls_stack.append(remove_sensors_stage)
+ ctx.sensor_cm = start_monitoring(ctx.receiver_uri, sensors_configs)
ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+ mon_q = Queue.Queue()
+
fd = open(cfg_dict['sensor_storage'], "w")
th = threading.Thread(None, save_sensors_data, None,
- (ctx.sensors_control_queue, fd))
+ (ctx.sensors_control_queue, mon_q, fd))
th.daemon = True
th.start()
ctx.sensor_listen_thread = th
+ nodes_ips_set = set(nodes_ips)
+ MAX_WAIT_FOR_SENSORS = 10
+ etime = time.time() + MAX_WAIT_FOR_SENSORS
+
+ msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
+ logger.debug(msg.format(MAX_WAIT_FOR_SENSORS, len(nodes_ips_set)))
+
+ # wait till all nodes start sending data
+ while len(nodes_ips_set) != 0:
+ tleft = etime - time.time()
+ try:
+ data = mon_q.get(True, tleft)
+ ip, port = data
+ except Queue.Empty:
+ msg = "Node {0} not sending any sensor data in {1}s"
+ msg = msg.format(", ".join(nodes_ips_set), MAX_WAIT_FOR_SENSORS)
+ raise RuntimeError(msg)
+
+ if ip not in nodes_ips_set:
+ logger.warning("Receive sensors from extra node: {0}".format(ip))
+
+ nodes_ips_set.remove(ip)
+
def remove_sensors_stage(cfg, ctx):
if ctx.sensor_cm is not None:
ctx.sensor_cm.__exit__(None, None, None)
- ctx.sensors_control_queue.put(None)
- ctx.sensor_listen_thread.join()
- ctx.sensor_data = ctx.sensors_control_queue.get()
+
+ if ctx.sensors_control_queue is not None:
+ ctx.sensors_control_queue.put(None)
+
+ if ctx.sensor_listen_thread is not None:
+ ctx.sensor_listen_thread.join()
def get_os_credentials(cfg, ctx, creds_type):
@@ -351,10 +340,10 @@
key, config = group.items()[0]
if 'start_test_nodes' == key:
- params = config['openstack']['vm_params']
+ params = config['vm_params'].copy()
os_nodes_ids = []
- os_creds_type = config['openstack']['creds']
+ os_creds_type = config['creds']
os_creds = get_os_credentials(cfg, ctx, os_creds_type)
start_vms.nova_connect(**os_creds)
@@ -364,6 +353,7 @@
new_nodes = []
try:
+ params['group_name'] = cfg_dict['run_uuid']
for new_node, node_id in start_vms.launch_vms(params):
new_node.roles.append('testnode')
ctx.nodes.append(new_node)
@@ -471,13 +461,22 @@
def console_report_stage(cfg, ctx):
for tp, data in ctx.results:
if 'io' == tp and data is not None:
- print format_results_for_console(data)
+ print(IOPerfTest.format_for_console(data))
def report_stage(cfg, ctx):
html_rep_fname = cfg['html_report_file']
- fuel_url = cfg['clouds']['fuel']['url']
- creds = cfg['clouds']['fuel']['creds']
+
+ try:
+ fuel_url = cfg['clouds']['fuel']['url']
+ except KeyError:
+ fuel_url = None
+
+ try:
+ creds = cfg['clouds']['fuel']['creds']
+ except KeyError:
+ creds = None
+
report.make_io_report(ctx.results, html_rep_fname, fuel_url, creds=creds)
logger.info("Html report were stored in " + html_rep_fname)
@@ -486,7 +485,7 @@
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results:
if 'io' == tp and data is not None:
- fd.write(format_results_for_console(data))
+ fd.write(IOPerfTest.format_for_console(data))
fd.write("\n")
fd.flush()
@@ -507,22 +506,22 @@
def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run disk io performance test")
+ descr = "Disk io performance test suite"
+ parser = argparse.ArgumentParser(prog='wally', description=descr)
parser.add_argument("-l", dest='extra_logs',
action='store_true', default=False,
help="print some extra log info")
-
parser.add_argument("-b", '--build_description',
type=str, default="Build info")
parser.add_argument("-i", '--build_id', type=str, default="id")
parser.add_argument("-t", '--build_type', type=str, default="GA")
parser.add_argument("-u", '--username', type=str, default="admin")
- parser.add_argument("-p", '--post-process-only', default=None)
- parser.add_argument("-o", '--output-dest', nargs="*")
- parser.add_argument("-k", '--keep-vm', action='store_true', default=False)
- parser.add_argument("config_file", nargs="?", default="config.yaml")
+ parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
+ help="Only process data from previour run")
+ parser.add_argument("-k", '--keep-vm', action='store_true',
+ help="Don't remove test vm's", default=False)
+ parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -540,7 +539,7 @@
stages = [
discover_stage,
log_nodes_statistic,
- # connect_stage,
+ connect_stage,
deploy_sensors_stage,
run_tests_stage,
store_raw_results_stage,
@@ -550,8 +549,12 @@
load_config(opts.config_file, opts.post_process_only)
- level = logging.DEBUG if opts.extra_logs else logging.WARNING
- setup_logger(logger, level, cfg_dict['log_file'])
+ if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
+ level = logging.DEBUG
+ else:
+ level = logging.WARNING
+
+ setup_loggers(level, cfg_dict['log_file'])
logger.info("All info would be stored into {0}".format(
cfg_dict['var_dir']))
@@ -567,6 +570,9 @@
for stage in stages:
logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
+ except Exception as exc:
+ msg = "Exception during current stage: {0}".format(exc.message)
+ logger.error(msg)
finally:
exc, cls, tb = sys.exc_info()
for stage in ctx.clear_calls_stack[::-1]:
@@ -581,7 +587,3 @@
logger.info("All info stored into {0}".format(cfg_dict['var_dir']))
return 0
-
-
-if __name__ == '__main__':
- exit(main(sys.argv))
diff --git a/sensors/__init__.py b/wally/sensors/__init__.py
similarity index 100%
rename from sensors/__init__.py
rename to wally/sensors/__init__.py
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
new file mode 100644
index 0000000..f66bb36
--- /dev/null
+++ b/wally/sensors/api.py
@@ -0,0 +1,58 @@
+import Queue
+import threading
+from contextlib import contextmanager
+
+from .deploy_sensors import (deploy_and_start_sensors,
+ stop_and_remove_sensors)
+from .protocol import create_protocol, Timeout
+
+
+__all__ = ['Empty', 'recv_main', 'start_monitoring',
+ 'deploy_and_start_sensors', 'SensorConfig']
+
+
+Empty = Queue.Empty
+
+
+class SensorConfig(object):
+ def __init__(self, conn, url, sensors):
+ self.conn = conn
+ self.url = url
+ self.sensors = sensors
+
+
+def recv_main(proto, data_q, cmd_q):
+ while True:
+ try:
+ data_q.put(proto.recv(0.1))
+ except Timeout:
+ pass
+
+ try:
+ val = cmd_q.get(False)
+
+ if val is None:
+ return
+
+ except Queue.Empty:
+ pass
+
+
+@contextmanager
+def start_monitoring(uri, configs):
+ deploy_and_start_sensors(uri, configs)
+ try:
+ data_q = Queue.Queue()
+ cmd_q = Queue.Queue()
+ proto = create_protocol(uri, receiver=True)
+ th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+ th.daemon = True
+ th.start()
+
+ try:
+ yield data_q
+ finally:
+ cmd_q.put(None)
+ th.join()
+ finally:
+ stop_and_remove_sensors(configs)
diff --git a/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
similarity index 96%
rename from sensors/cp_protocol.py
rename to wally/sensors/cp_protocol.py
index 962bf0a..4e96afe 100644
--- a/sensors/cp_protocol.py
+++ b/wally/sensors/cp_protocol.py
@@ -4,16 +4,12 @@
import re
import zlib
import json
+import logging
import binascii
-try:
- from disk_perf_test_tool.logger import define_logger
- logger = define_logger(__name__)
-except ImportError:
- class Logger(object):
- def debug(self, *dt):
- pass
- logger = Logger()
+
+logger = logging.getLogger("wally")
+
# protocol contains 2 type of packet:
# 1 - header, which contains template schema of counters
diff --git a/sensors/cp_transport.py b/wally/sensors/cp_transport.py
similarity index 100%
rename from sensors/cp_transport.py
rename to wally/sensors/cp_transport.py
diff --git a/sensors/daemonize.py b/wally/sensors/daemonize.py
similarity index 100%
rename from sensors/daemonize.py
rename to wally/sensors/daemonize.py
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
new file mode 100644
index 0000000..249adfb
--- /dev/null
+++ b/wally/sensors/deploy_sensors.py
@@ -0,0 +1,87 @@
+import time
+import json
+import os.path
+import logging
+
+from concurrent.futures import ThreadPoolExecutor, wait
+
+from wally.ssh_utils import copy_paths, run_over_ssh
+
+logger = logging.getLogger('wally')
+
+
+def wait_all_ok(futures):
+ return all(future.result() for future in futures)
+
+
+def deploy_and_start_sensors(monitor_uri, sensor_configs,
+ remote_path='/tmp/sensors/sensors'):
+
+ paths = {os.path.dirname(__file__): remote_path}
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for node_sensor_config in sensor_configs:
+ futures.append(executor.submit(deploy_and_start_sensor,
+ paths,
+ node_sensor_config,
+ monitor_uri,
+ remote_path))
+
+ if not wait_all_ok(futures):
+ raise RuntimeError("Sensor deployment fails on some nodes")
+
+
+def deploy_and_start_sensor(paths, node_sensor_config,
+ monitor_uri, remote_path):
+ try:
+ copy_paths(node_sensor_config.conn, paths)
+ sftp = node_sensor_config.conn.open_sftp()
+
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ with sftp.open(config_remote_path, "w") as fd:
+ fd.write(json.dumps(node_sensor_config.sensors))
+
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
+
+ cmd = cmd_templ.format(os.path.dirname(remote_path),
+ monitor_uri,
+ config_remote_path)
+
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+ sftp.close()
+
+ except:
+ msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+ logger.exception(msg)
+ return False
+ return True
+
+
+def stop_and_remove_sensor(conn, url, remote_path):
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+
+ run_over_ssh(conn, cmd.format(remote_path), node=url)
+
+ # some magic
+ time.sleep(0.3)
+
+ conn.exec_command("rm -rf {0}".format(remote_path))
+
+ logger.debug("Sensors stopped and removed")
+
+
+def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ futures = []
+
+ for node_sensor_config in configs:
+ futures.append(executor.submit(stop_and_remove_sensor,
+ node_sensor_config.conn,
+ node_sensor_config.url,
+ remote_path))
+
+ wait(futures)
diff --git a/sensors/discover.py b/wally/sensors/discover.py
similarity index 100%
rename from sensors/discover.py
rename to wally/sensors/discover.py
diff --git a/sensors/main.py b/wally/sensors/main.py
similarity index 93%
rename from sensors/main.py
rename to wally/sensors/main.py
index 3c953fa..3753e7c 100644
--- a/sensors/main.py
+++ b/wally/sensors/main.py
@@ -7,18 +7,18 @@
import os.path
import argparse
-from sensors.utils import SensorInfo
-from daemonize import Daemonize
-from discover import all_sensors
-from protocol import create_protocol
+from .sensors.utils import SensorInfo
+from .daemonize import Daemonize
+from .discover import all_sensors
+from .protocol import create_protocol
# load all sensors
-import sensors
+from . import sensors
sensors_dir = os.path.dirname(sensors.__file__)
for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
mod_name = os.path.basename(fname[:-3])
- __import__("sensors." + mod_name)
+ __import__("sensors.sensors." + mod_name)
def get_values(required_sensors):
diff --git a/sensors/protocol.py b/wally/sensors/protocol.py
similarity index 99%
rename from sensors/protocol.py
rename to wally/sensors/protocol.py
index 02b661a..c2ace01 100644
--- a/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -5,7 +5,7 @@
import cPickle as pickle
from urlparse import urlparse
-import cp_transport
+from . import cp_transport
class Timeout(Exception):
diff --git a/sensors/receiver.py b/wally/sensors/receiver.py
similarity index 93%
rename from sensors/receiver.py
rename to wally/sensors/receiver.py
index 76f8bb7..ff0f223 100644
--- a/sensors/receiver.py
+++ b/wally/sensors/receiver.py
@@ -1,4 +1,4 @@
-from api import start_monitoring, Empty
+from .api import start_monitoring, Empty
# from influx_exporter import connect, add_data
uri = "udp://192.168.0.104:12001"
diff --git a/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
similarity index 100%
rename from sensors/sensors/__init__.py
rename to wally/sensors/sensors/__init__.py
diff --git a/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
similarity index 95%
rename from sensors/sensors/io_sensors.py
rename to wally/sensors/sensors/io_sensors.py
index 615d381..c9ff340 100644
--- a/sensors/sensors/io_sensors.py
+++ b/wally/sensors/sensors/io_sensors.py
@@ -1,5 +1,5 @@
-from discover import provides
-from utils import SensorInfo, is_dev_accepted
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
# 1 - major number
# 2 - minor mumber
diff --git a/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
similarity index 93%
rename from sensors/sensors/net_sensors.py
rename to wally/sensors/sensors/net_sensors.py
index 3a2d926..4a4e477 100644
--- a/sensors/sensors/net_sensors.py
+++ b/wally/sensors/sensors/net_sensors.py
@@ -1,5 +1,5 @@
-from discover import provides
-from utils import SensorInfo, is_dev_accepted
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
# 1 - major number
# 2 - minor mumber
diff --git a/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
similarity index 91%
rename from sensors/sensors/pscpu_sensors.py
rename to wally/sensors/sensors/pscpu_sensors.py
index 83b18c6..cffdb71 100644
--- a/sensors/sensors/pscpu_sensors.py
+++ b/wally/sensors/sensors/pscpu_sensors.py
@@ -1,7 +1,7 @@
import os
-from discover import provides
-from utils import SensorInfo, get_pid_name, get_pid_list
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
@provides("perprocess-cpu")
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
new file mode 100644
index 0000000..cbd85e6
--- /dev/null
+++ b/wally/sensors/sensors/psram_sensors.py
@@ -0,0 +1,76 @@
+from ..discover import provides
+from .utils import SensorInfo, get_pid_name, get_pid_list
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author: P@draigBrady.com
+# Source: http://www.pixelbeat.org/scripts/ps_mem.py
+# http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+ """ Return memory data of pid in format (private, shared) """
+
+ fname = '/proc/{0}/{1}'.format(pid, "smaps")
+ lines = open(fname).readlines()
+
+ shared = 0
+ private = 0
+ pss = 0
+
+ # add 0.5KiB as this avg error due to trunctation
+ pss_adjust = 0.5
+
+ for line in lines:
+ if line.startswith("Shared"):
+ shared += int(line.split()[1])
+
+ if line.startswith("Private"):
+ private += int(line.split()[1])
+
+ if line.startswith("Pss"):
+ pss += float(line.split()[1]) + pss_adjust
+
+ # Note Shared + Private = Rss above
+ # The Rss in smaps includes video card mem etc.
+
+ if pss != 0:
+ shared = int(pss - private)
+
+ return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+ results = {}
+ pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+ print pid_list
+ for pid in pid_list:
+ try:
+ dev_name = get_pid_name(pid)
+
+ private, shared = get_mem_stats(pid)
+ total = private + shared
+ sys_total = get_ram_size()
+ usage = float(total) / float(sys_total)
+
+ sensor_name = "{0}.{1}".format(dev_name, pid)
+
+ results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+ results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+ results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+ name = sensor_name + ".mem_usage_percent"
+ results[name] = SensorInfo(usage * 100, False)
+ except IOError:
+ # permission denied or proc die
+ continue
+ return results
+
+
+def get_ram_size():
+ """ Return RAM size in Kb"""
+ with open("/proc/meminfo") as proc:
+ mem_total = proc.readline().split()
+ return mem_total[1]
diff --git a/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
similarity index 93%
rename from sensors/sensors/syscpu_sensors.py
rename to wally/sensors/sensors/syscpu_sensors.py
index 4e49495..d3da02b 100644
--- a/sensors/sensors/syscpu_sensors.py
+++ b/wally/sensors/sensors/syscpu_sensors.py
@@ -1,5 +1,5 @@
-from discover import provides
-from utils import SensorInfo, is_dev_accepted
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
# 0 - cpu name
# 1 - user: normal processes executing in user mode
diff --git a/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
similarity index 89%
rename from sensors/sensors/sysram_sensors.py
rename to wally/sensors/sensors/sysram_sensors.py
index fa0c409..c78eddd 100644
--- a/sensors/sensors/sysram_sensors.py
+++ b/wally/sensors/sensors/sysram_sensors.py
@@ -1,5 +1,5 @@
-from discover import provides
-from utils import SensorInfo, is_dev_accepted
+from ..discover import provides
+from .utils import SensorInfo, is_dev_accepted
# return this values or setted in allowed
diff --git a/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
similarity index 100%
rename from sensors/sensors/utils.py
rename to wally/sensors/sensors/utils.py
diff --git a/sensors/storage/__init__.py b/wally/sensors/storage/__init__.py
similarity index 100%
rename from sensors/storage/__init__.py
rename to wally/sensors/storage/__init__.py
diff --git a/sensors/storage/grafana.py b/wally/sensors/storage/grafana.py
similarity index 100%
rename from sensors/storage/grafana.py
rename to wally/sensors/storage/grafana.py
diff --git a/sensors/storage/grafana_template.js b/wally/sensors/storage/grafana_template.js
similarity index 100%
rename from sensors/storage/grafana_template.js
rename to wally/sensors/storage/grafana_template.js
diff --git a/sensors/storage/influx_exporter.py b/wally/sensors/storage/influx_exporter.py
similarity index 100%
rename from sensors/storage/influx_exporter.py
rename to wally/sensors/storage/influx_exporter.py
diff --git a/sensors/storage/koder.js b/wally/sensors/storage/koder.js
similarity index 100%
rename from sensors/storage/koder.js
rename to wally/sensors/storage/koder.js
diff --git a/sensors/umsgpack.py b/wally/sensors/umsgpack.py
similarity index 100%
rename from sensors/umsgpack.py
rename to wally/sensors/umsgpack.py
diff --git a/ssh_utils.py b/wally/ssh_utils.py
similarity index 93%
rename from ssh_utils.py
rename to wally/ssh_utils.py
index aea3111..68d4017 100644
--- a/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -10,10 +10,10 @@
import paramiko
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally")
-def ssh_connect(creds, retry_count=60, timeout=1):
+def ssh_connect(creds, retry_count=6, timeout=10):
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
@@ -27,6 +27,7 @@
if creds.passwd is not None:
ssh.connect(creds.host,
+ timeout=timeout, # tcp connect timeout
username=user,
password=creds.passwd,
port=creds.port,
@@ -37,6 +38,7 @@
if creds.key_file is not None:
ssh.connect(creds.host,
username=user,
+ timeout=timeout, # tcp connect timeout
key_filename=creds.key_file,
look_for_keys=False,
port=creds.port)
@@ -45,6 +47,7 @@
key_file = os.path.expanduser('~/.ssh/id_rsa')
ssh.connect(creds.host,
username=user,
+ timeout=timeout, # tcp connect timeout
key_filename=key_file,
look_for_keys=False,
port=creds.port)
@@ -52,10 +55,11 @@
# raise ValueError("Wrong credentials {0}".format(creds.__dict__))
except paramiko.PasswordRequiredException:
raise
- except socket.error:
+ except socket.error as err:
+ print err
if i == retry_count - 1:
raise
- time.sleep(timeout)
+ time.sleep(1)
def normalize_dirpath(dirpath):
@@ -168,6 +172,9 @@
for name in self.conn_uri_attrs:
setattr(self, name, None)
+ def __str__(self):
+ return str(self.__dict__)
+
uri_reg_exprs = []
@@ -237,7 +244,8 @@
all_sessions = []
-def run_over_ssh(conn, cmd, stdin_data=None, timeout=60, nolog=False, node=None):
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
+ nolog=False, node=None):
"should be replaces by normal implementation, with select"
transport = conn.get_transport()
session = transport.open_session()
diff --git a/start_vms.py b/wally/start_vms.py
similarity index 91%
rename from start_vms.py
rename to wally/start_vms.py
index 62e4ca5..4fd35ae 100644
--- a/start_vms.py
+++ b/wally/start_vms.py
@@ -1,6 +1,7 @@
import re
import os
import time
+import os.path
import logging
import subprocess
@@ -10,10 +11,11 @@
from novaclient.client import Client as n_client
from cinderclient.v1.client import Client as c_client
-from nodes.node import Node
+import wally
+from wally.discover import Node
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally.vms")
def ostack_get_creds():
@@ -58,8 +60,12 @@
params_s = " ".join("{}={}".format(k, v) for k, v in params.items())
- cmd_templ = "env {params} bash scripts/prepare.sh >/dev/null"
- cmd = cmd_templ.format(params=params_s)
+ spath = os.path.dirname(wally.__file__)
+ spath = os.path.dirname(spath)
+ spath = os.path.join(spath, 'scripts/prepare.sh')
+
+ cmd_templ = "env {params} bash {spath} >/dev/null"
+ cmd = cmd_templ.format(params=params_s, spath=spath)
subprocess.call(cmd, shell=True)
@@ -141,8 +147,9 @@
cinder = c_client(*ostack_get_creds())
vol = cinder.volumes.create(size=size, display_name=name)
err_count = 0
- while vol.status != 'available':
- if vol.status == 'error':
+
+ while vol['status'] != 'available':
+ if vol['status'] == 'error':
if err_count == 3:
logger.critical("Fail to create volume")
raise RuntimeError("Fail to create volume")
@@ -153,7 +160,7 @@
vol = cinder.volumes.create(size=size, display_name=name)
continue
time.sleep(1)
- vol = cinder.volumes.get(vol.id)
+ vol = cinder.volumes.get(vol['id'])
return vol
@@ -219,9 +226,9 @@
yield Node(conn_uri, []), os_node.id
-def create_vms_mt(nova, amount, keypair_name, img_name,
+def create_vms_mt(nova, amount, group_name, keypair_name, img_name,
flavor_name, vol_sz=None, network_zone_name=None,
- flt_ip_pool=None, name_templ='ceph-test-{0}',
+ flt_ip_pool=None, name_templ='wally-{id}',
scheduler_hints=None, security_group=None):
with ThreadPoolExecutor(max_workers=16) as executor:
@@ -254,7 +261,9 @@
else:
nics = None
- names = map(name_templ.format, range(amount))
+ names = []
+ for i in range(amount):
+ names.append(name_templ.format(group=group_name, id=i))
futures = []
logger.debug("Requesting new vms")
@@ -290,20 +299,24 @@
logger.debug(msg.format(srv))
nova.servers.delete(srv)
- while True:
+ for j in range(120):
# print "wait till server deleted"
all_id = set(alive_srv.id for alive_srv in nova.servers.list())
if srv.id not in all_id:
break
time.sleep(1)
+ else:
+ raise RuntimeError("Server {0} delete timeout".format(srv.id))
else:
break
+ else:
+ raise RuntimeError("Failed to start server".format(srv.id))
if vol_sz is not None:
# print "creating volume"
vol = create_volume(vol_sz, name)
# print "attach volume to server"
- nova.volumes.create_server_volume(srv.id, vol.id, None)
+ nova.volumes.create_server_volume(srv.id, vol['id'], None)
if flt_ip is Allocate:
flt_ip = nova.floating_ips.create(pool)
diff --git a/statistic.py b/wally/statistic.py
similarity index 92%
rename from statistic.py
rename to wally/statistic.py
index 01b0cec..0729283 100644
--- a/statistic.py
+++ b/wally/statistic.py
@@ -1,8 +1,12 @@
import math
import itertools
-from numpy import array, linalg
-from numpy.polynomial.chebyshev import chebfit, chebval
-from scipy.optimize import leastsq
+
+try:
+ from numpy import array, linalg
+ from scipy.optimize import leastsq
+ from numpy.polynomial.chebyshev import chebfit, chebval
+except ImportError:
+ no_numpy = True
def med_dev(vals):
@@ -37,6 +41,9 @@
def approximate_curve(x, y, xnew, curved_coef):
"""returns ynew - y values of some curve approximation"""
+ if no_numpy:
+ return None
+
return chebval(xnew, chebfit(x, y, curved_coef))
@@ -45,6 +52,9 @@
if not relative_dist distance = y - newy
returns ynew - y values of linear approximation"""
+ if no_numpy:
+ return None
+
# convert to numpy.array (don't work without it)
ox = array(x)
oy = array(y)
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
new file mode 100644
index 0000000..7b6610e
--- /dev/null
+++ b/wally/suits/__init__.py
@@ -0,0 +1,3 @@
+from .itest import TwoScriptTest, PgBenchTest, IOPerfTest
+
+__all__ = ["TwoScriptTest", "PgBenchTest", "IOPerfTest"]
diff --git a/ext_libs/__init__.py b/wally/suits/io/__init__.py
similarity index 100%
rename from ext_libs/__init__.py
rename to wally/suits/io/__init__.py
diff --git a/tests/disk_test_agent.py b/wally/suits/io/agent.py
similarity index 92%
rename from tests/disk_test_agent.py
rename to wally/suits/io/agent.py
index 938ec3f..7589346 100644
--- a/tests/disk_test_agent.py
+++ b/wally/suits/io/agent.py
@@ -15,14 +15,14 @@
SETTING = 1
-def get_test_sync_mode(jconfig):
+def get_test_sync_mode(config):
try:
- return jconfig['sync_mode']
+ return config['sync_mode']
except KeyError:
pass
- is_sync = jconfig.get("sync", "0") == "1"
- is_direct = jconfig.get("direct", "0") == "1"
+ is_sync = config.get("sync", "0") == "1"
+ is_direct = config.get("direct", "0") == "1"
if is_sync and is_direct:
return 'x'
@@ -357,9 +357,15 @@
try:
parsed_out = json.loads(raw_out)["jobs"]
- except Exception:
+ except KeyError:
+ msg = "Can't parse fio output {0!r}: no 'jobs' found"
+ raw_out = raw_out[:100]
+ raise ValueError(msg.format(raw_out))
+
+ except Exception as exc:
msg = "Can't parse fio output: {0!r}\nError: {1}"
- raise ValueError(msg.format(raw_out, traceback.format_exc()))
+ raw_out = raw_out[:100]
+ raise ValueError(msg.format(raw_out, exc.message))
return zip(parsed_out, bconf)
@@ -474,6 +480,7 @@
res = {}
curr_test_num = skip_tests
executed_tests = 0
+ ok = True
try:
for bconf in next_test_portion(whole_conf, runcycle):
@@ -509,9 +516,12 @@
raise
except Exception:
+ print "=========== ERROR ============="
traceback.print_exc()
+ print "======== END OF ERROR ========="
+ ok = False
- return res, executed_tests
+ return res, executed_tests, ok
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -637,13 +647,13 @@
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
- job_res, num_tests = run_benchmark(argv_obj.type,
- job_cfg,
- params,
- argv_obj.runcycle,
- rrfunc,
- argv_obj.skip_tests,
- argv_obj.faked_fio)
+ job_res, num_tests, ok = run_benchmark(argv_obj.type,
+ job_cfg,
+ params,
+ argv_obj.runcycle,
+ rrfunc,
+ argv_obj.skip_tests,
+ argv_obj.faked_fio)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
@@ -656,10 +666,23 @@
out_fd.write(json.dumps(res))
else:
out_fd.write(pprint.pformat(res) + "\n")
- out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
+ out_fd.write("\n========= END OF RESULTS =========\n")
+ return 0 if ok else 1
+
+
+def fake_main(x):
+ import yaml
+ time.sleep(60)
+ out_fd = sys.stdout
+ fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
+ res = yaml.load(open(fname).read())[0][1]
+ out_fd.write("========= RESULTS(format=json) =========\n")
+ out_fd.write(json.dumps(res))
+ out_fd.write("\n========= END OF RESULTS =========\n")
return 0
if __name__ == '__main__':
+ # exit(fake_main(sys.argv[1:]))
exit(main(sys.argv[1:]))
diff --git a/formatters.py b/wally/suits/io/formatter.py
similarity index 63%
rename from formatters.py
rename to wally/suits/io/formatter.py
index 185cae5..529b78a 100644
--- a/formatters.py
+++ b/wally/suits/io/formatter.py
@@ -1,33 +1,37 @@
import texttable
-from utils import ssize_to_b
-from statistic import med_dev
-from disk_perf_test_tool.tests.disk_test_agent import get_test_summary
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+from wally.suits.io.agent import get_test_summary
def key_func(k_data):
_, data = k_data
- bsz = ssize_to_b(data['blocksize'])
- tp = data['rw']
- return tp, data['sync_mode'], bsz, data['concurence']
+ return (data['rw'],
+ data['sync_mode'],
+ ssize_to_b(data['blocksize']),
+ data['concurence'])
def format_results_for_console(test_set):
- data_for_print = []
+ """
+ create a table with io performance report
+ for console
+ """
tab = texttable.Texttable()
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
tab.set_cols_align(["l", "r", "r", "r", "r"])
- items = sorted(test_set['res'].items(), key=key_func)
prev_k = None
+ items = sorted(test_set['res'].items(), key=key_func)
for test_name, data in items:
curr_k = key_func((test_name, data))[:3]
if prev_k is not None:
if prev_k != curr_k:
- data_for_print.append(["---"] * 5)
+ tab.add_row(["---"] * 5)
prev_k = curr_k
@@ -41,11 +45,9 @@
params = (descr, int(iops), int(bw), dev_perc,
int(med_dev(data['lat'])[0]) // 1000)
- data_for_print.append(params)
+ tab.add_row(params)
- header = ["Description", "IOPS", "BW KBps", "Dev * 3 %", "LAT ms"]
+ header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
tab.header(header)
- map(tab.add_row, data_for_print)
-
return tab.draw()
diff --git a/tests/io_scenario_check_distribution.cfg b/wally/suits/io/io_scenario_check_distribution.cfg
similarity index 100%
rename from tests/io_scenario_check_distribution.cfg
rename to wally/suits/io/io_scenario_check_distribution.cfg
diff --git a/tests/io_scenario_check_linearity.cfg b/wally/suits/io/io_scenario_check_linearity.cfg
similarity index 100%
rename from tests/io_scenario_check_linearity.cfg
rename to wally/suits/io/io_scenario_check_linearity.cfg
diff --git a/tests/io_scenario_check_th_count.cfg b/wally/suits/io/io_scenario_check_th_count.cfg
similarity index 100%
rename from tests/io_scenario_check_th_count.cfg
rename to wally/suits/io/io_scenario_check_th_count.cfg
diff --git a/tests/io_scenario_check_vm_count_ec2.cfg b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
similarity index 100%
rename from tests/io_scenario_check_vm_count_ec2.cfg
rename to wally/suits/io/io_scenario_check_vm_count_ec2.cfg
diff --git a/tests/io_scenario_check_warmup.cfg b/wally/suits/io/io_scenario_check_warmup.cfg
similarity index 100%
rename from tests/io_scenario_check_warmup.cfg
rename to wally/suits/io/io_scenario_check_warmup.cfg
diff --git a/tests/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
similarity index 100%
rename from tests/io_scenario_hdd.cfg
rename to wally/suits/io/io_scenario_hdd.cfg
diff --git a/tests/io_scenario_long_test.cfg b/wally/suits/io/io_scenario_long_test.cfg
similarity index 100%
rename from tests/io_scenario_long_test.cfg
rename to wally/suits/io/io_scenario_long_test.cfg
diff --git a/io_results_loader.py b/wally/suits/io/results_loader.py
similarity index 94%
rename from io_results_loader.py
rename to wally/suits/io/results_loader.py
index 6316a6d..25721eb 100644
--- a/io_results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -2,8 +2,8 @@
import json
-from utils import ssize_to_b
-from statistic import med_dev
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
def parse_output(out_err):
diff --git a/tests/itest.py b/wally/suits/itest.py
similarity index 78%
rename from tests/itest.py
rename to wally/suits/itest.py
index d1e7c00..91e9dd5 100644
--- a/tests/itest.py
+++ b/wally/suits/itest.py
@@ -3,15 +3,16 @@
import os.path
import logging
-from disk_perf_test_tool.tests import disk_test_agent
-from disk_perf_test_tool.tests.disk_test_agent import parse_fio_config_full
-from disk_perf_test_tool.tests.disk_test_agent import estimate_cfg, sec_to_str
-from disk_perf_test_tool.tests.io_results_loader import parse_output
-from disk_perf_test_tool.ssh_utils import copy_paths, run_over_ssh, delete_file
-from disk_perf_test_tool.utils import ssize_to_b
+from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
+from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+
+from . import postgres
+from .io import agent as io_agent
+from .io import formatter as io_formatter
+from .io.results_loader import parse_output
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally")
class IPerfTest(object):
@@ -30,23 +31,22 @@
def run(self, conn, barrier):
pass
+ @classmethod
+ def format_for_console(cls, data):
+ msg = "{0}.format_for_console".format(cls.__name__)
+ raise NotImplementedError(msg)
+
class TwoScriptTest(IPerfTest):
+ remote_tmp_dir = '/tmp'
+
def __init__(self, opts, on_result_cb, log_directory=None, node=None):
- super(TwoScriptTest, self).__init__(on_result_cb, log_directory,
- node=node)
+ IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
self.opts = opts
- self.pre_run_script = None
- self.run_script = None
- self.tmp_dir = "/tmp/"
- self.set_run_script()
- self.set_pre_run_script()
- def set_run_script(self):
- self.pre_run_script = self.opts.pre_run_script
-
- def set_pre_run_script(self):
- self.run_script = self.opts.run_script
+ if 'run_script' in self.opts:
+ self.run_script = self.opts['run_script']
+ self.prepare_script = self.opts['prepare_script']
def get_remote_for_script(self, script):
return os.path.join(self.tmp_dir, script.rpartition('/')[2])
@@ -84,21 +84,9 @@
class PgBenchTest(TwoScriptTest):
-
- def set_run_script(self):
- self.pre_run_script = "tests/postgres/prepare.sh"
-
- def set_pre_run_script(self):
- self.run_script = "tests/postgres/run.sh"
-
-
-def open_for_append_or_create(fname):
- if not os.path.exists(fname):
- return open(fname, "w")
-
- fd = open(fname, 'r+')
- fd.seek(0, os.SEEK_END)
- return fd
+ root = os.path.dirname(postgres.__file__)
+ prepare_script = os.path.join(root, "prepare.sh")
+ run_script = os.path.join(root, "run.sh")
class IOPerfTest(IPerfTest):
@@ -113,20 +101,21 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
- self.configs = list(parse_fio_config_full(self.raw_cfg,
- self.config_params))
+ self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
+ self.config_params))
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
fio_command_file = open_for_append_or_create(cmd_log)
- fio_command_file.write(disk_test_agent.compile(self.raw_cfg,
- self.config_params,
- None))
+ fio_command_file.write(io_agent.compile(self.raw_cfg,
+ self.config_params,
+ None))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def cleanup(self, conn):
delete_file(conn, self.io_py_remote)
+ # Need to remove tempo files, used for testing
def pre_run(self, conn):
try:
@@ -144,11 +133,10 @@
else:
raise OSError("Can't install fio - " + err.message)
- local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
+ local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
self.files_to_copy = {local_fname: self.io_py_remote}
copy_paths(conn, self.files_to_copy)
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
files = {}
for secname, params in self.configs:
@@ -160,12 +148,17 @@
fname = params['filename']
files[fname] = max(files.get(fname, 0), msz)
+ # logger.warning("dd run DISABLED")
+ # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+ cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
for fname, sz in files.items():
cmd = cmd_templ.format(fname, 1024 ** 2, msz)
run_over_ssh(conn, cmd, timeout=msz, node=self.node)
def run(self, conn, barrier):
cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+ # cmd_templ = "env python2 {0} --type {1} {2} --json -"
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
@@ -176,7 +169,7 @@
cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
logger.debug("Waiting on barrier")
- exec_time = estimate_cfg(self.raw_cfg, self.config_params)
+ exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
exec_time_str = sec_to_str(exec_time)
try:
@@ -238,3 +231,7 @@
assert res_test_data[k] == v, msg
return merged_result
+
+ @classmethod
+ def format_for_console(cls, data):
+ return io_formatter.format_results_for_console(data)
diff --git a/tests/postgres/__init__.py b/wally/suits/postgres/__init__.py
similarity index 100%
rename from tests/postgres/__init__.py
rename to wally/suits/postgres/__init__.py
diff --git a/tests/postgres/prepare.sh b/wally/suits/postgres/prepare.sh
similarity index 100%
rename from tests/postgres/prepare.sh
rename to wally/suits/postgres/prepare.sh
diff --git a/tests/postgres/run.sh b/wally/suits/postgres/run.sh
similarity index 100%
rename from tests/postgres/run.sh
rename to wally/suits/postgres/run.sh
diff --git a/utils.py b/wally/utils.py
similarity index 69%
rename from utils.py
rename to wally/utils.py
index 5a9f188..60645d4 100644
--- a/utils.py
+++ b/wally/utils.py
@@ -1,11 +1,12 @@
import re
+import os
import logging
import threading
import contextlib
import subprocess
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally")
def parse_creds(creds):
@@ -86,14 +87,38 @@
cmd = 'ip route get to'.split(" ") + [target_ip]
data = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout.read()
- rr = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
- rr = rr.replace(" ", r'\s+')
- rr = rr.format(target_ip.replace('.', r'\.'))
+ rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+ rr1 = rr1.replace(" ", r'\s+')
+ rr1 = rr1.format(target_ip.replace('.', r'\.'))
+
+ rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
+ rr2 = rr2.replace(" ", r'\s+')
+ rr2 = rr2.format(target_ip.replace('.', r'\.'))
data_line = data.split("\n")[0].strip()
- res = re.match(rr, data_line)
+ res1 = re.match(rr1, data_line)
+ res2 = re.match(rr2, data_line)
- if res is None:
- raise OSError("Can't define interface for {0}".format(target_ip))
+ if res1 is not None:
+ return res1.group('ip')
- return res.group('ip')
+ if res2 is not None:
+ return res2.group('ip')
+
+ raise OSError("Can't define interface for {0}".format(target_ip))
+
+
+def open_for_append_or_create(fname):
+ if not os.path.exists(fname):
+ return open(fname, "w")
+
+ fd = open(fname, 'r+')
+ fd.seek(0, os.SEEK_END)
+ return fd
+
+
+def sec_to_str(seconds):
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+ return "{0}:{1:02d}:{2:02d}".format(h, m, s)