blob: 2e00e804c6cc405d29b238a58953bfd9ff84680a [file] [log] [blame]
Ved-vampir0c7e2d42015-03-18 17:18:47 +03001#!/usr/bin/env python
2""" UDP sender class """
3
4import socket
5import urlparse
6
7from cp_protocol import Packet
koder aka kdanilove06762a2015-03-22 23:32:09 +02008
9try:
10 from disk_perf_test_tool.logger import define_logger
11 logger = define_logger(__name__)
12except ImportError:
13 class Logger(object):
14 def debug(self, *dt):
15 pass
16 logger = Logger()
Ved-vampir0c7e2d42015-03-18 17:18:47 +030017
18
19class SenderException(Exception):
20 """ Exceptions in Sender class """
21 pass
22
23
24class Timeout(Exception):
25 """ Exceptions in Sender class """
26 pass
27
28
29class Sender(object):
30 """ UDP sender class """
31
Ved-vampir2c2f2e92015-03-18 18:02:25 +030032 def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
Ved-vampir0c7e2d42015-03-18 17:18:47 +030033 """ Create connection object from input udp string or params"""
34
35 # test input
36 if url is None and port is None:
37 raise SenderException("Bad initialization")
38 if url is not None:
39 data = urlparse.urlparse(url)
40 # check schema
41 if data.scheme != "udp":
42 mes = "Bad protocol type: %s instead of UDP" % data.scheme
43 logger.error(mes)
44 raise SenderException("Bad protocol type")
45 # try to get port
46 try:
47 int_port = int(data.port)
48 except ValueError:
49 logger.error("Bad UDP port")
50 raise SenderException("Bad UDP port")
51 # save paths
52 self.sendto = (data.hostname, int_port)
53 self.bindto = (data.hostname, int_port)
54 # try to get size
55 try:
56 self.size = int(data.path.strip("/"))
57 except ValueError:
58 logger.error("Bad packet part size")
59 raise SenderException("Bad packet part size")
60 else:
61 # url is None - use size and port
62 self.sendto = (host, port)
63 self.bindto = ("0.0.0.0", port)
64 self.size = size
65
Ved-vampir2c2f2e92015-03-18 18:02:25 +030066 self.packer = packer
67
Ved-vampir0c7e2d42015-03-18 17:18:47 +030068 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
69 self.binded = False
70 self.all_data = {}
71 self.send_packer = None
72
Ved-vampir0c7e2d42015-03-18 17:18:47 +030073 def bind(self):
74 """ Prepare for listening """
75 self.sock.bind(self.bindto)
76 self.sock.settimeout(0.5)
77 self.binded = True
78
Ved-vampir0c7e2d42015-03-18 17:18:47 +030079 def send(self, data):
80 """ Send data to udp socket"""
81 if self.sock.sendto(data, self.sendto) != len(data):
82 mes = "Cannot send data to %s:%s" % self.sendto
83 logger.error(mes)
84 raise SenderException("Cannot send data")
85
Ved-vampir0c7e2d42015-03-18 17:18:47 +030086 def send_by_protocol(self, data):
87 """ Send data by Packet protocol
88 data = dict"""
89 if self.send_packer is None:
Ved-vampir2c2f2e92015-03-18 18:02:25 +030090 self.send_packer = Packet(self.packer())
Ved-vampir0c7e2d42015-03-18 17:18:47 +030091 parts = self.send_packer.create_packet_v2(data, self.size)
92 for part in parts:
93 self.send(part)
94
Ved-vampir0c7e2d42015-03-18 17:18:47 +030095 def recv(self):
96 """ Receive data from udp socket"""
97 # check for binding
98 if not self.binded:
99 self.bind()
100 # try to recv
101 try:
102 data, (remote_ip, _) = self.sock.recvfrom(self.size)
103 return data, remote_ip
104 except socket.timeout:
105 raise Timeout()
106
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300107 def recv_by_protocol(self):
108 """ Receive data from udp socket by Packet protocol"""
109 data, remote_ip = self.recv()
110
111 if remote_ip not in self.all_data:
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300112 self.all_data[remote_ip] = Packet(self.packer())
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300113
114 return self.all_data[remote_ip].new_packet(data)
115
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300116 def recv_with_answer(self, stop_event=None):
117 """ Receive data from udp socket and send 'ok' back
118 Command port = local port + 1
119 Answer port = local port
120 Waiting for command is blocking """
121 # create command socket
122 command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
123 command_port = self.bindto[1]+1
124 command_sock.bind(("0.0.0.0", command_port))
125 command_sock.settimeout(1)
126 # try to recv
127 while True:
128 try:
129 data, (remote_ip, _) = command_sock.recvfrom(self.size)
130 self.send("ok")
131 return data, remote_ip
132 except socket.timeout:
133 if stop_event is not None and stop_event.is_set():
134 # return None if we are interrupted
135 return None
136
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300137 def verified_send(self, send_host, message, max_repeat=20):
138 """ Send and verify it by answer not more then max_repeat
139 Send port = local port + 1
140 Answer port = local port
141 Return True if send is verified """
142 # create send socket
143 send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
144 send_port = self.sendto[1]+1
145 for repeat in range(0, max_repeat):
146 send_sock.sendto(message, (send_host, send_port))
147 try:
148 data, remote_ip = self.recv()
149 if remote_ip == send_host and data == "ok":
150 return True
151 else:
koder aka kdanilove06762a2015-03-22 23:32:09 +0200152 logger.warning("No answer from %s, try %i",
153 send_host, repeat)
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300154 except Timeout:
155 logger.warning("No answer from %s, try %i", send_host, repeat)
156
157 return False