a lot of changes
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 6fff833..4a1c5df 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -70,7 +70,8 @@
time.sleep(0.3)
# logger.warning("Sensors don't removed")
- run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
+ run_over_ssh(conn, "rm -rf {0}".format(remote_path),
+ node=url, timeout=10)
except Exception as exc:
msg = "Failed to remove sensors from node {0}: {1!s}"
logger.error(msg.format(url, exc))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index 67aef2a..c053011 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -1,4 +1,5 @@
import sys
+import csv
import time
import struct
import socket
@@ -217,6 +218,28 @@
self.fd = open(fname, "w")
+class CSVFileTransport(ITransport):
+ required_keys = set(['time', 'source_id', 'hostname'])
+
+ def __init__(self, receiver, fname):
+ ITransport.__init__(self, receiver)
+ self.fd = open(fname, "w")
+ self.csv_fd = csv.writer(self.fd)
+ self.field_list = []
+ self.csv_fd.writerow(['NEW_DATA'])
+
+ def send(self, data):
+ if self.field_list == []:
+ keys = set(data)
+ assert self.required_keys.issubset(keys)
+ keys -= self.required_keys
+ self.field_list = sorted(keys)
+ self.csv_fd.writerow([data['source_id'], data['hostname']] +
+ self.field_list)
+
+ self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
+
+
class UDPTransport(ITransport):
def __init__(self, receiver, ip, port, packer_cls):
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -261,6 +284,8 @@
packer_cls=packer_cls)
elif parsed_uri.scheme == 'file':
return FileTransport(receiver, parsed_uri.path)
+ elif parsed_uri.scheme == 'csvfile':
+ return CSVFileTransport(receiver, parsed_uri.path)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))