blob: eaf8b9e2beb43d012ee1ede28924ba77200d4759 [file] [log] [blame]
Mark Slee254ce202007-05-16 02:21:06 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
Mark Slee254ce202007-05-16 02:21:06 +00009#
David Reissea2cba82009-03-30 21:35:00 +000010# http://www.apache.org/licenses/LICENSE-2.0
Mark Slee254ce202007-05-16 02:21:06 +000011#
David Reissea2cba82009-03-30 21:35:00 +000012# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
Mark Slee254ce202007-05-16 02:21:06 +000018#
19
20require 5.6.0;
21use strict;
22use warnings;
23
24use Thrift;
25use Thrift::Transport;
26
27use IO::Socket::INET;
28use IO::Select;
29
30package Thrift::Socket;
31
Jim Kingf5f1b352015-06-24 13:47:24 -040032use base qw( Thrift::Transport );
Mark Slee254ce202007-05-16 02:21:06 +000033
34sub new
35{
T Jake Luciani0c5c2342009-11-12 03:01:33 +000036 my $classname = shift;
37 my $host = shift || "localhost";
38 my $port = shift || 9090;
Mark Slee254ce202007-05-16 02:21:06 +000039 my $debugHandler = shift;
40
41 my $self = {
42 host => $host,
43 port => $port,
44 debugHandler => $debugHandler,
45 debug => 0,
T Jake Lucianif2b8f752009-11-12 02:44:42 +000046 sendTimeout => 10000,
47 recvTimeout => 10000,
Mark Slee254ce202007-05-16 02:21:06 +000048 handle => undef,
49 };
50
51 return bless($self,$classname);
52}
53
54
55sub setSendTimeout
56{
57 my $self = shift;
58 my $timeout = shift;
59
60 $self->{sendTimeout} = $timeout;
61}
62
63sub setRecvTimeout
64{
65 my $self = shift;
66 my $timeout = shift;
67
68 $self->{recvTimeout} = $timeout;
69}
70
71
72#
73#Sets debugging output on or off
74#
75# @param bool $debug
76#
77sub setDebug
78{
79 my $self = shift;
80 my $debug = shift;
81
82 $self->{debug} = $debug;
83}
84
85#
86# Tests whether this is open
87#
88# @return bool true if the socket is open
89#
90sub isOpen
91{
92 my $self = shift;
93
T Jake Luciani0d738892008-12-23 03:12:50 +000094 if( defined $self->{handle} ){
95 return ($self->{handle}->handles())[0]->connected;
96 }
97
98 return 0;
Mark Slee254ce202007-05-16 02:21:06 +000099}
100
101#
102# Connects the socket.
103#
104sub open
105{
106 my $self = shift;
107
Jim Kingf5f1b352015-06-24 13:47:24 -0400108 my $sock = $self->__open() || do {
109 my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
Mark Slee254ce202007-05-16 02:21:06 +0000110
Jim Kingf5f1b352015-06-24 13:47:24 -0400111 if ($self->{debug}) {
112 $self->{debugHandler}->($error);
113 }
Mark Slee254ce202007-05-16 02:21:06 +0000114
Jim Kingf5f1b352015-06-24 13:47:24 -0400115 die new Thrift::TException($error);
116 };
Mark Slee254ce202007-05-16 02:21:06 +0000117
118 $self->{handle} = new IO::Select( $sock );
119}
120
121#
122# Closes the socket.
123#
124sub close
125{
126 my $self = shift;
Jim Kingf5f1b352015-06-24 13:47:24 -0400127 if( defined $self->{handle} ) {
128 $self->__close();
T Jake Luciani0d738892008-12-23 03:12:50 +0000129 }
Mark Slee254ce202007-05-16 02:21:06 +0000130}
131
132#
133# Uses stream get contents to do the reading
134#
135# @param int $len How many bytes
136# @return string Binary data
137#
138sub readAll
139{
140 my $self = shift;
141 my $len = shift;
142
143
T Jake Luciani0d738892008-12-23 03:12:50 +0000144 return unless defined $self->{handle};
145
Mark Slee254ce202007-05-16 02:21:06 +0000146 my $pre = "";
147 while (1) {
148
Jim Kingf5f1b352015-06-24 13:47:24 -0400149 my $sock = $self->__wait();
150 my $buf = $self->__recv($sock, $len);
Mark Slee254ce202007-05-16 02:21:06 +0000151
152 if (!defined $buf || $buf eq '') {
153
Jim Kingf5f1b352015-06-24 13:47:24 -0400154 die new Thrift::TException(ref($self).': Could not read '.$len.' bytes from '.
Mark Slee254ce202007-05-16 02:21:06 +0000155 $self->{host}.':'.$self->{port});
156
Jim Kingf5f1b352015-06-24 13:47:24 -0400157 } elsif ((my $sz = length($buf)) < $len) {
Mark Slee254ce202007-05-16 02:21:06 +0000158
159 $pre .= $buf;
160 $len -= $sz;
161
162 } else {
163 return $pre.$buf;
164 }
165 }
166}
167
168#
169# Read from the socket
170#
171# @param int $len How many bytes
172# @return string Binary data
173#
174sub read
175{
176 my $self = shift;
177 my $len = shift;
178
T Jake Luciani0d738892008-12-23 03:12:50 +0000179 return unless defined $self->{handle};
180
Jim Kingf5f1b352015-06-24 13:47:24 -0400181 my $sock = $self->__wait();
182 my $buf = $self->__recv($sock, $len);
Mark Slee254ce202007-05-16 02:21:06 +0000183
184 if (!defined $buf || $buf eq '') {
185
Jim Kingf5f1b352015-06-24 13:47:24 -0400186 die new TException(ref($self).': Could not read '.$len.' bytes from '.
Mark Slee254ce202007-05-16 02:21:06 +0000187 $self->{host}.':'.$self->{port});
188
189 }
190
191 return $buf;
192}
193
194
195#
196# Write to the socket.
197#
198# @param string $buf The data to write
199#
200sub write
201{
202 my $self = shift;
203 my $buf = shift;
204
T Jake Luciani0d738892008-12-23 03:12:50 +0000205 return unless defined $self->{handle};
206
Mark Slee254ce202007-05-16 02:21:06 +0000207 while (length($buf) > 0) {
Mark Slee254ce202007-05-16 02:21:06 +0000208 #check for timeout
T Jake Luciani93f82192009-11-12 02:36:30 +0000209 my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
Mark Slee254ce202007-05-16 02:21:06 +0000210
211 if(@sockets == 0){
Jim Kingf5f1b352015-06-24 13:47:24 -0400212 die new Thrift::TException(ref($self).': timed out writing to bytes from '.
Mark Slee254ce202007-05-16 02:21:06 +0000213 $self->{host}.':'.$self->{port});
214 }
215
Jim Kingf5f1b352015-06-24 13:47:24 -0400216 my $sent = $self->__send($sockets[0], $buf);
Mark Slee254ce202007-05-16 02:21:06 +0000217
Jim Kingf5f1b352015-06-24 13:47:24 -0400218 if (!defined $sent || $sent == 0 ) {
219
220 die new Thrift::TException(ref($self).': Could not write '.length($buf).' bytes '.
Mark Slee254ce202007-05-16 02:21:06 +0000221 $self->{host}.':'.$self->{host});
Jim Kingf5f1b352015-06-24 13:47:24 -0400222
Mark Slee254ce202007-05-16 02:21:06 +0000223 }
224
Jim Kingf5f1b352015-06-24 13:47:24 -0400225 $buf = substr($buf, $sent);
Mark Slee254ce202007-05-16 02:21:06 +0000226 }
227}
228
229#
230# Flush output to the socket.
231#
232sub flush
233{
234 my $self = shift;
T Jake Luciani0d738892008-12-23 03:12:50 +0000235
236 return unless defined $self->{handle};
237
Jim Kingf5f1b352015-06-24 13:47:24 -0400238 my $ret = ($self->{handle}->handles())[0]->flush;
Mark Slee254ce202007-05-16 02:21:06 +0000239}
240
Jim Kingf5f1b352015-06-24 13:47:24 -0400241###
242### Overridable methods
243###
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000244
245#
Jim Kingf5f1b352015-06-24 13:47:24 -0400246# Open a connection to a server.
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000247#
Jim Kingf5f1b352015-06-24 13:47:24 -0400248sub __open
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000249{
250 my $self = shift;
Jim Kingf5f1b352015-06-24 13:47:24 -0400251 return IO::Socket::INET->new(PeerAddr => $self->{host},
252 PeerPort => $self->{port},
253 Proto => 'tcp',
254 Timeout => $self->{sendTimeout} / 1000);
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000255}
256
Jim Kingf5f1b352015-06-24 13:47:24 -0400257#
258# Close the connection
259#
260sub __close
261{
262 my $self = shift;
263 CORE::close(($self->{handle}->handles())[0]);
264}
265
266#
267# Read data
268#
269# @param[in] $sock the socket
270# @param[in] $len the length to read
271# @returns the data buffer that was read
272#
273sub __recv
274{
275 my $self = shift;
276 my $sock = shift;
277 my $len = shift;
278 my $buf = undef;
279 $sock->recv($buf, $len);
280 return $buf;
281}
282
283#
284# Send data
285#
286# @param[in] $sock the socket
287# @param[in] $buf the data buffer
288# @returns the number of bytes written
289#
290sub __send
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000291{
292 my $self = shift;
Jim Kingf5f1b352015-06-24 13:47:24 -0400293 my $sock = shift;
294 my $buf = shift;
295 return $sock->send($buf);
296}
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000297
Jim Kingf5f1b352015-06-24 13:47:24 -0400298#
299# Wait for data to be readable
300#
301# @returns a socket that can be read
302#
303sub __wait
304{
305 my $self = shift;
306 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
307
308 if (@sockets == 0) {
309 die new Thrift::TException(ref($self).': timed out reading from '.
310 $self->{host}.':'.$self->{port});
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000311 }
312
Jim Kingf5f1b352015-06-24 13:47:24 -0400313 return $sockets[0];
T Jake Luciani0c5c2342009-11-12 03:01:33 +0000314}
315
316
Mark Slee254ce202007-05-16 02:21:06 +00003171;