large commit. new code, with sensors, line count dropped, etc
diff --git a/sensors/api.py b/sensors/api.py
index f78e6a9..dc34af0 100644
--- a/sensors/api.py
+++ b/sensors/api.py
@@ -17,6 +17,7 @@
data_q.put(proto.recv(0.1))
except Timeout:
pass
+
try:
val = cmd_q.get(False)
@@ -28,8 +29,9 @@
@contextmanager
-def start_monitoring(uri, config):
- deploy_and_start_sensors(uri, config)
+def start_monitoring(uri, config=None, connected_config=None):
+ deploy_and_start_sensors(uri, config=config,
+ connected_config=connected_config)
try:
data_q = Queue.Queue()
cmd_q = Queue.Queue()
@@ -44,4 +46,5 @@
cmd_q.put(None)
th.join()
finally:
- stop_and_remove_sensors(config)
+ stop_and_remove_sensors(config,
+ connected_config=connected_config)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index b2dc3f1..e0428d9 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -1,33 +1,50 @@
import time
import json
import os.path
+import logging
from concurrent.futures import ThreadPoolExecutor, wait
from disk_perf_test_tool.ssh_utils import connect, copy_paths
+logger = logging.getLogger('io-perf-tool')
+
def wait_all_ok(futures):
return all(future.result() for future in futures)
-def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
+def deploy_and_start_sensors(monitor_uri, config,
+ remote_path='/tmp/sensors',
+ connected_config=None):
paths = {os.path.dirname(__file__): remote_path}
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
- for uri, config in config.items():
+ if connected_config is not None:
+ assert config is None
+ node_iter = connected_config
+ else:
+ node_iter = config.items()
+
+ for uri_or_conn, config in node_iter:
futures.append(executor.submit(deploy_and_start_sensor,
- paths, uri, monitor_uri,
+ paths, uri_or_conn,
+ monitor_uri,
config, remote_path))
if not wait_all_ok(futures):
raise RuntimeError("Sensor deployment fails on some nodes")
-def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
+def deploy_and_start_sensor(paths, uri_or_conn, monitor_uri, config,
+ remote_path):
try:
- conn = connect(uri)
+ if isinstance(uri_or_conn, basestring):
+ conn = connect(uri_or_conn)
+ else:
+ conn = uri_or_conn
+
copy_paths(conn, paths)
sftp = conn.open_sftp()
@@ -41,17 +58,23 @@
cmd = cmd_templ.format(main_remote_path,
monitor_uri,
config_remote_path)
- print "Executing", cmd
conn.exec_command(cmd)
sftp.close()
- conn.close()
+
+ if isinstance(uri_or_conn, basestring):
+ conn.close()
except:
+ logger.exception("During deploing sensors in {0}".format(uri_or_conn))
return False
return True
-def stop_and_remove_sensor(uri, remote_path):
- conn = connect(uri)
+def stop_and_remove_sensor(uri_or_conn, remote_path):
+ if isinstance(uri_or_conn, basestring):
+ conn = connect(uri_or_conn)
+ else:
+ conn = uri_or_conn
+
main_remote_path = os.path.join(remote_path, "main.py")
cmd_templ = "python {0} -d stop"
@@ -62,15 +85,23 @@
conn.exec_command("rm -rf {0}".format(remote_path))
- conn.close()
+ if isinstance(uri_or_conn, basestring):
+ conn.close()
-def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
+def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
+ connected_config=None):
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
- for uri, config in config.items():
+ if connected_config is not None:
+ assert config is None
+ conf_iter = connected_config
+ else:
+ conf_iter = config.items()
+
+ for uri_or_conn, config in conf_iter:
futures.append(executor.submit(stop_and_remove_sensor,
- uri, remote_path))
+ uri_or_conn, remote_path))
wait(futures)
diff --git a/sensors/main.py b/sensors/main.py
index fea46a3..3c953fa 100644
--- a/sensors/main.py
+++ b/sensors/main.py
@@ -64,6 +64,10 @@
time.sleep(opts.timeout)
+def pid_running(pid):
+ return os.path.exists("/proc/" + str(pid))
+
+
def main(argv):
opts = parse_args(argv)
@@ -86,12 +90,12 @@
elif opts.daemon == 'stop':
if os.path.isfile(pid_file):
pid = int(open(pid_file).read())
- if os.path.exists("/proc/" + str(pid)):
+ if pid_running(pid):
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
- if os.path.exists("/proc/" + str(pid)):
+ if pid_running(pid):
os.kill(pid, signal.SIGKILL)
if os.path.isfile(pid_file):
@@ -99,7 +103,7 @@
elif opts.daemon == 'status':
if os.path.isfile(pid_file):
pid = int(open(pid_file).read())
- if os.path.exists("/proc/" + str(pid)):
+ if pid_running(pid):
print "running"
return
print "stopped"
diff --git a/sensors/protocol.py b/sensors/protocol.py
index cfdd93e..02b661a 100644
--- a/sensors/protocol.py
+++ b/sensors/protocol.py
@@ -1,3 +1,4 @@
+import sys
import time
import socket
import select
@@ -57,6 +58,7 @@
# ------------------------------------- Transports ---------------------------
+
class ITransport(object):
def __init__(self, receiver):
pass
@@ -73,12 +75,14 @@
def __init__(self, receiver, delta=True):
if receiver:
- raise ValueError("StdoutTransport don't allows receiving")
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
self.headers = None
self.line_format = ""
self.prev = {}
self.delta = delta
+ self.fd = sys.stdout
def send(self, data):
if self.headers is None:
@@ -100,10 +104,17 @@
else:
vals = [data[header].value for header in self.headers]
- print self.line_format.format(*vals)
+ self.fd.write(self.line_format.format(*vals) + "\n")
def recv(self, timeout=None):
- raise ValueError("StdoutTransport don't allows receiving")
+ cname = self.__class__.__name__
+ raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+ def __init__(self, receiver, fname, delta=True):
+ StdoutTransport.__init__(self, receiver, delta)
+ self.fd = open(fname, "w")
class UDPTransport(ITransport):
@@ -170,10 +181,12 @@
ip, port = parsed_uri.netloc.split(":")
return UDPTransport(receiver, ip=ip, port=int(port),
packer_cls=PickleSerializer)
+ elif parsed_uri.scheme == 'file':
+ return FileTransport(receiver, parsed_uri.path)
elif parsed_uri.scheme == 'hugeudp':
ip, port = parsed_uri.netloc.split(":")
return HugeUDPTransport(receiver, ip=ip, port=int(port),
- packer_cls=MSGPackSerializer)
+ packer_cls=MSGPackSerializer)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))