large refactoring of new code
diff --git a/sensors/config.yaml b/sensors/config.yaml
deleted file mode 100644
index d9ce8bb..0000000
--- a/sensors/config.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-192.168.0.104:
- block-io:
- allowed_prefixes: [sda1, rbd1]
- net-io:
- allowed_prefixes: [virbr2]
-
-# 192.168.152.39:
-# block-io:
-# allowed_prefixes: [sdb, sdc]
-# net-io:
-# allowed_prefixes: [eth0]
-
-# 192.168.152.40:
-# block-io:
-# allowed_prefixes: [sdb, sdc]
-# net-io:
-# allowed_prefixes: [eth0]
-
-# 192.168.152.41:
-# block-io:
-# allowed_prefixes: [sdb, sdc]
-# net-io:
-# allowed_prefixes: [eth0]
diff --git a/sensors/cp_protocol.py b/sensors/cp_protocol.py
index 79f94af..962bf0a 100644
--- a/sensors/cp_protocol.py
+++ b/sensors/cp_protocol.py
@@ -5,9 +5,15 @@
import zlib
import json
import binascii
-import logging
-from logger import define_logger
+try:
+ from disk_perf_test_tool.logger import define_logger
+ logger = define_logger(__name__)
+except ImportError:
+ class Logger(object):
+ def debug(self, *dt):
+ pass
+ logger = Logger()
# protocol contains 2 type of packet:
# 1 - header, which contains template schema of counters
@@ -52,7 +58,6 @@
self.tmpl_size = 0
self.packer = packer
-
def new_packet(self, part):
""" New packet adding """
# proceed packet
@@ -111,10 +116,8 @@
else:
return None
-
except PacketException as e:
# if something wrong - skip packet
- logger = logging.getLogger(__name__)
logger.warning("Packet skipped: %s", e)
self.is_begin = False
self.is_end = False
@@ -122,7 +125,6 @@
except TypeError:
# if something wrong - skip packet
- logger = logging.getLogger(__name__)
logger.warning("Packet skipped: doesn't match schema")
self.is_begin = False
self.is_end = False
@@ -130,13 +132,11 @@
except:
# if something at all wrong - skip packet
- logger = logging.getLogger(__name__)
logger.warning("Packet skipped: something is wrong")
self.is_begin = False
self.is_end = False
return None
-
@staticmethod
def create_packet(data, part_size):
""" Create packet divided by parts with part_size from data
@@ -160,7 +160,6 @@
return result
-
def create_packet_v2(self, data, part_size):
""" Create packet divided by parts with part_size from data
Compressed """
@@ -179,7 +178,6 @@
result.extend(parts)
return result
-
def get_matching_value_list(self, data):
""" Get values in order server expect"""
vals = range(0, self.tmpl_size)
@@ -188,7 +186,7 @@
for node, groups in self.clt_template.items():
for group, counters in groups.items():
for counter, index in counters.items():
- if not isinstance(index, dict):
+ if not isinstance(index, dict):
vals[index] = data[node][group][counter]
else:
for k, i in index.items():
@@ -201,8 +199,6 @@
logger.error("Data don't match last schema")
raise PacketException("Data don't match last schema")
-
-
def create_answer_template(self, perf_string):
""" Create template for server to insert counter values
Return tuple of server and clien templates + number of replaces"""
@@ -233,7 +229,3 @@
self.tmpl_size = k
self.clt_template = json.loads(clt_template)
-
-
-
-define_logger(__name__)
diff --git a/sensors/cp_transport.py b/sensors/cp_transport.py
index 1b951f2..2e00e80 100644
--- a/sensors/cp_transport.py
+++ b/sensors/cp_transport.py
@@ -5,7 +5,15 @@
import urlparse
from cp_protocol import Packet
-from logger import define_logger
+
+try:
+ from disk_perf_test_tool.logger import define_logger
+ logger = define_logger(__name__)
+except ImportError:
+ class Logger(object):
+ def debug(self, *dt):
+ pass
+ logger = Logger()
class SenderException(Exception):
@@ -62,14 +70,12 @@
self.all_data = {}
self.send_packer = None
-
def bind(self):
""" Prepare for listening """
self.sock.bind(self.bindto)
self.sock.settimeout(0.5)
self.binded = True
-
def send(self, data):
""" Send data to udp socket"""
if self.sock.sendto(data, self.sendto) != len(data):
@@ -77,7 +83,6 @@
logger.error(mes)
raise SenderException("Cannot send data")
-
def send_by_protocol(self, data):
""" Send data by Packet protocol
data = dict"""
@@ -87,7 +92,6 @@
for part in parts:
self.send(part)
-
def recv(self):
""" Receive data from udp socket"""
# check for binding
@@ -100,7 +104,6 @@
except socket.timeout:
raise Timeout()
-
def recv_by_protocol(self):
""" Receive data from udp socket by Packet protocol"""
data, remote_ip = self.recv()
@@ -110,7 +113,6 @@
return self.all_data[remote_ip].new_packet(data)
-
def recv_with_answer(self, stop_event=None):
""" Receive data from udp socket and send 'ok' back
Command port = local port + 1
@@ -132,7 +134,6 @@
# return None if we are interrupted
return None
-
def verified_send(self, send_host, message, max_repeat=20):
""" Send and verify it by answer not more then max_repeat
Send port = local port + 1
@@ -148,12 +149,9 @@
if remote_ip == send_host and data == "ok":
return True
else:
- logger.warning("No answer from %s, try %i", send_host, repeat)
+ logger.warning("No answer from %s, try %i",
+ send_host, repeat)
except Timeout:
logger.warning("No answer from %s, try %i", send_host, repeat)
return False
-
-
-
-logger = define_logger(__name__)
diff --git a/sensors/daemonize.py b/sensors/daemonize.py
index 1c3241b..a4fa157 100644
--- a/sensors/daemonize.py
+++ b/sensors/daemonize.py
@@ -20,17 +20,22 @@
- action: your custom function which will be executed after daemonization.
- keep_fds: optional list of fds which should not be closed.
- auto_close_fds: optional parameter to not close opened fds.
- - privileged_action: action that will be executed before drop privileges if user or
+ - privileged_action: action that will be executed before
+ drop privileges if user or
group parameter is provided.
- If you want to transfer anything from privileged_action to action, such as
- opened privileged file descriptor, you should return it from
- privileged_action function and catch it inside action function.
+ If you want to transfer anything from privileged
+ action to action, such as opened privileged file
+ descriptor, you should return it from
+ privileged_action function and catch it inside action
+ function.
- user: drop privileges to this user if provided.
- group: drop privileges to this group if provided.
- verbose: send debug messages to logger if provided.
- logger: use this logger object instead of creating new one, if provided.
"""
- def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True, privileged_action=None, user=None, group=None, verbose=False, logger=None):
+ def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
+ privileged_action=None, user=None, group=None, verbose=False,
+ logger=None):
self.app = app
self.pid = pid
self.action = action
@@ -62,19 +67,22 @@
""" start method
Main daemonization process.
"""
- # If pidfile already exists, we should read pid from there; to overwrite it, if locking
+ # If pidfile already exists, we should read pid from there;
+ # to overwrite it, if locking
# will fail, because locking attempt somehow purges the file contents.
if os.path.isfile(self.pid):
with open(self.pid, "r") as old_pidfile:
old_pid = old_pidfile.read()
- # Create a lockfile so that only one instance of this daemon is running at any time.
+ # Create a lockfile so that only one instance of this daemon is
+ # running at any time.
try:
lockfile = open(self.pid, "w")
except IOError:
print("Unable to create the pidfile.")
sys.exit(1)
try:
- # Try to get an exclusive lock on the file. This will fail if another process has the file
+ # Try to get an exclusive lock on the file. This will fail if
+ # another process has the file
# locked.
fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
@@ -97,7 +105,8 @@
# Stop listening for signals that the parent process receives.
# This is done by getting a new process id.
# setpgrp() is an alternative to setsid().
- # setsid puts the process in a new parent group and detaches its controlling terminal.
+ # setsid puts the process in a new parent group and detaches
+ # its controlling terminal.
process_id = os.setsid()
if process_id == -1:
# Uh oh, there was a problem.
@@ -106,10 +115,12 @@
# Add lockfile to self.keep_fds.
self.keep_fds.append(lockfile.fileno())
- # Close all file descriptors, except the ones mentioned in self.keep_fds.
+ # Close all file descriptors, except the ones mentioned in
+ # self.keep_fds.
devnull = "/dev/null"
if hasattr(os, "devnull"):
- # Python has set os.devnull on this system, use it instead as it might be different
+ # Python has set os.devnull on this system, use it instead as it
+ # might be different
# than /dev/null.
devnull = os.devnull
@@ -140,7 +151,8 @@
else:
syslog_address = "/dev/log"
- # We will continue with syslog initialization only if actually have such capabilities
+ # We will continue with syslog initialization only if
+ # actually have such capabilities
# on the machine we are running this.
if os.path.isfile(syslog_address):
syslog = handlers.SysLogHandler(syslog_address)
@@ -149,17 +161,20 @@
else:
syslog.setLevel(logging.INFO)
# Try to mimic to normal syslog messages.
- formatter = logging.Formatter("%(asctime)s %(name)s: %(message)s",
+ format_t = "%(asctime)s %(name)s: %(message)s"
+ formatter = logging.Formatter(format_t,
"%b %e %H:%M:%S")
syslog.setFormatter(formatter)
self.logger.addHandler(syslog)
- # Set umask to default to safe file permissions when running as a root daemon. 027 is an
+ # Set umask to default to safe file permissions when running
+ # as a root daemon. 027 is an
# octal number which we are typing as 0o27 for Python3 compatibility.
os.umask(0o27)
- # Change to a known directory. If this isn't done, starting a daemon in a subdirectory that
+ # Change to a known directory. If this isn't done, starting a daemon
+ # in a subdirectory that
# needs to be deleted results in "directory busy" errors.
os.chdir("/")
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index a1091c2..b2dc3f1 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -2,11 +2,10 @@
import json
import os.path
-from ssh_copy_directory import copy_paths
-from ssh_runner import connect
-
from concurrent.futures import ThreadPoolExecutor, wait
+from disk_perf_test_tool.ssh_utils import connect, copy_paths
+
def wait_all_ok(futures):
return all(future.result() for future in futures)
@@ -42,6 +41,7 @@
cmd = cmd_templ.format(main_remote_path,
monitor_uri,
config_remote_path)
+ print "Executing", cmd
conn.exec_command(cmd)
sftp.close()
conn.close()
diff --git a/sensors/logger.py b/sensors/logger.py
deleted file mode 100644
index bd4c6ef..0000000
--- a/sensors/logger.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/env python
-""" Logger initialization """
-
-import logging
-
-
-def define_logger(name):
- """ Initialization of logger"""
- logger = logging.getLogger(name)
- logger.setLevel(logging.INFO)
- ch = logging.StreamHandler()
- ch.setLevel(logging.INFO)
- logger.addHandler(ch)
-
- log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
- formatter = logging.Formatter(log_format,
- "%H:%M:%S")
- ch.setFormatter(formatter)
- return logger
diff --git a/sensors/main.py b/sensors/main.py
index 449acc1..fea46a3 100644
--- a/sensors/main.py
+++ b/sensors/main.py
@@ -2,25 +2,25 @@
import sys
import time
import json
+import glob
import signal
import os.path
import argparse
-# pylint: disable=W0611
-import io_sensors
-import net_sensors
-import pscpu_sensors
-import psram_sensors
-import syscpu_sensors
-import sysram_sensors
-# pylint: enable=W0611
-
-from utils import SensorInfo
+from sensors.utils import SensorInfo
from daemonize import Daemonize
from discover import all_sensors
from protocol import create_protocol
+# load all sensors
+import sensors
+sensors_dir = os.path.dirname(sensors.__file__)
+for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
+ mod_name = os.path.basename(fname[:-3])
+ __import__("sensors." + mod_name)
+
+
def get_values(required_sensors):
result = {}
for sensor_name, params in required_sensors:
@@ -68,7 +68,7 @@
opts = parse_args(argv)
if opts.list_sensors:
- print " ".join(all_sensors)
+ print "\n".join(sorted(all_sensors))
return 0
if opts.daemon is not None:
diff --git a/sensors/receiver.py b/sensors/receiver.py
index 8903f24..76f8bb7 100644
--- a/sensors/receiver.py
+++ b/sensors/receiver.py
@@ -1,41 +1,19 @@
-import yaml
-
from api import start_monitoring, Empty
-from influx_exporter import connect, add_data
-
-monitor_config = yaml.load(open("config.yaml").read())
+# from influx_exporter import connect, add_data
uri = "udp://192.168.0.104:12001"
-infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
-conn = connect(infldb_url)
+# infldb_url = "influxdb://perf:perf@192.168.152.42:8086/perf"
+# conn = connect(infldb_url)
-# sw_per_ip = {}
-# count = 4
-# expected = sorted(monitor_config.keys())
-
-# if 'sda1.sectors_written' in data:
-# val = data['sda1.sectors_written']
-# elif 'sdb.sectors_written' in data:
-# val = data['sdb.sectors_written']
-# else:
-# val = 0
-
-# sw_per_ip[ip] = sw_per_ip.get(ip, 0) + val
-# count -= 1
-
-# if 0 == count:
-# try:
-# vals = [sw_per_ip[ip] for ip in expected]
-# print ("{:>6}" * len(expected)).format(*vals)
-# sw_per_ip = {}
-# count = 4
-# except KeyError:
-# pass
+monitor_config = {'127.0.0.1':
+ {"block-io": {'allowed_prefixes': ['sda1', 'rbd1']},
+ "net-io": {"allowed_prefixes": ["virbr2"]}}}
with start_monitoring(uri, monitor_config) as queue:
while True:
try:
(ip, port), data = queue.get(True, 1)
- add_data(conn, ip, [data])
+ print (ip, port), data
+ # add_data(conn, ip, [data])
except Empty:
pass
diff --git a/sensors/sensors/__init__.py b/sensors/sensors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sensors/sensors/__init__.py
diff --git a/sensors/io_sensors.py b/sensors/sensors/io_sensors.py
similarity index 100%
rename from sensors/io_sensors.py
rename to sensors/sensors/io_sensors.py
diff --git a/sensors/net_sensors.py b/sensors/sensors/net_sensors.py
similarity index 100%
rename from sensors/net_sensors.py
rename to sensors/sensors/net_sensors.py
diff --git a/sensors/ps_mem.py b/sensors/sensors/ps_mem.py
similarity index 100%
rename from sensors/ps_mem.py
rename to sensors/sensors/ps_mem.py
diff --git a/sensors/pscpu_sensors.py b/sensors/sensors/pscpu_sensors.py
similarity index 99%
rename from sensors/pscpu_sensors.py
rename to sensors/sensors/pscpu_sensors.py
index f7c2d20..83b18c6 100644
--- a/sensors/pscpu_sensors.py
+++ b/sensors/sensors/pscpu_sensors.py
@@ -4,7 +4,6 @@
from utils import SensorInfo, get_pid_name, get_pid_list
-
@provides("perprocess-cpu")
def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
results = {}
diff --git a/sensors/psram_sensors.py b/sensors/sensors/psram_sensors.py
similarity index 100%
rename from sensors/psram_sensors.py
rename to sensors/sensors/psram_sensors.py
diff --git a/sensors/syscpu_sensors.py b/sensors/sensors/syscpu_sensors.py
similarity index 100%
rename from sensors/syscpu_sensors.py
rename to sensors/sensors/syscpu_sensors.py
diff --git a/sensors/sysram_sensors.py b/sensors/sensors/sysram_sensors.py
similarity index 100%
rename from sensors/sysram_sensors.py
rename to sensors/sensors/sysram_sensors.py
diff --git a/sensors/utils.py b/sensors/sensors/utils.py
similarity index 100%
rename from sensors/utils.py
rename to sensors/sensors/utils.py
diff --git a/sensors/storage/__init__.py b/sensors/storage/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sensors/storage/__init__.py
diff --git a/sensors/grafana.py b/sensors/storage/grafana.py
similarity index 100%
rename from sensors/grafana.py
rename to sensors/storage/grafana.py
diff --git a/sensors/grafana_template.js b/sensors/storage/grafana_template.js
similarity index 100%
rename from sensors/grafana_template.js
rename to sensors/storage/grafana_template.js
diff --git a/sensors/influx_exporter.py b/sensors/storage/influx_exporter.py
similarity index 100%
rename from sensors/influx_exporter.py
rename to sensors/storage/influx_exporter.py