Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 1 | # |
David Reiss | ea2cba8 | 2009-03-30 21:35:00 +0000 | [diff] [blame] | 2 | # Licensed to the Apache Software Foundation (ASF) under one |
| 3 | # or more contributor license agreements. See the NOTICE file |
| 4 | # distributed with this work for additional information |
| 5 | # regarding copyright ownership. The ASF licenses this file |
| 6 | # to you under the Apache License, Version 2.0 (the |
| 7 | # "License"); you may not use this file except in compliance |
| 8 | # with the License. You may obtain a copy of the License at |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 9 | # |
David Reiss | ea2cba8 | 2009-03-30 21:35:00 +0000 | [diff] [blame] | 10 | # http://www.apache.org/licenses/LICENSE-2.0 |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 11 | # |
David Reiss | ea2cba8 | 2009-03-30 21:35:00 +0000 | [diff] [blame] | 12 | # Unless required by applicable law or agreed to in writing, |
| 13 | # software distributed under the License is distributed on an |
| 14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | # KIND, either express or implied. See the License for the |
| 16 | # specific language governing permissions and limitations |
| 17 | # under the License. |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 18 | # |
| 19 | |
| 20 | require 5.6.0; |
| 21 | use strict; |
| 22 | use warnings; |
| 23 | |
| 24 | use Thrift; |
| 25 | use Thrift::Transport; |
| 26 | |
| 27 | use IO::Socket::INET; |
| 28 | use IO::Select; |
| 29 | |
| 30 | package Thrift::Socket; |
| 31 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 32 | use base qw( Thrift::Transport ); |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 33 | |
| 34 | sub new |
| 35 | { |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 36 | my $classname = shift; |
| 37 | my $host = shift || "localhost"; |
| 38 | my $port = shift || 9090; |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 39 | my $debugHandler = shift; |
| 40 | |
| 41 | my $self = { |
| 42 | host => $host, |
| 43 | port => $port, |
| 44 | debugHandler => $debugHandler, |
| 45 | debug => 0, |
T Jake Luciani | f2b8f75 | 2009-11-12 02:44:42 +0000 | [diff] [blame] | 46 | sendTimeout => 10000, |
| 47 | recvTimeout => 10000, |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 48 | handle => undef, |
| 49 | }; |
| 50 | |
| 51 | return bless($self,$classname); |
| 52 | } |
| 53 | |
| 54 | |
| 55 | sub setSendTimeout |
| 56 | { |
| 57 | my $self = shift; |
| 58 | my $timeout = shift; |
| 59 | |
| 60 | $self->{sendTimeout} = $timeout; |
| 61 | } |
| 62 | |
| 63 | sub setRecvTimeout |
| 64 | { |
| 65 | my $self = shift; |
| 66 | my $timeout = shift; |
| 67 | |
| 68 | $self->{recvTimeout} = $timeout; |
| 69 | } |
| 70 | |
| 71 | |
| 72 | # |
| 73 | #Sets debugging output on or off |
| 74 | # |
| 75 | # @param bool $debug |
| 76 | # |
| 77 | sub setDebug |
| 78 | { |
| 79 | my $self = shift; |
| 80 | my $debug = shift; |
| 81 | |
| 82 | $self->{debug} = $debug; |
| 83 | } |
| 84 | |
| 85 | # |
| 86 | # Tests whether this is open |
| 87 | # |
| 88 | # @return bool true if the socket is open |
| 89 | # |
| 90 | sub isOpen |
| 91 | { |
| 92 | my $self = shift; |
| 93 | |
T Jake Luciani | 0d73889 | 2008-12-23 03:12:50 +0000 | [diff] [blame] | 94 | if( defined $self->{handle} ){ |
| 95 | return ($self->{handle}->handles())[0]->connected; |
| 96 | } |
| 97 | |
| 98 | return 0; |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 99 | } |
| 100 | |
| 101 | # |
| 102 | # Connects the socket. |
| 103 | # |
| 104 | sub open |
| 105 | { |
| 106 | my $self = shift; |
| 107 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 108 | my $sock = $self->__open() || do { |
| 109 | my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')'; |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 110 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 111 | if ($self->{debug}) { |
| 112 | $self->{debugHandler}->($error); |
| 113 | } |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 114 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 115 | die new Thrift::TException($error); |
| 116 | }; |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 117 | |
| 118 | $self->{handle} = new IO::Select( $sock ); |
| 119 | } |
| 120 | |
| 121 | # |
| 122 | # Closes the socket. |
| 123 | # |
| 124 | sub close |
| 125 | { |
| 126 | my $self = shift; |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 127 | if( defined $self->{handle} ) { |
| 128 | $self->__close(); |
T Jake Luciani | 0d73889 | 2008-12-23 03:12:50 +0000 | [diff] [blame] | 129 | } |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 130 | } |
| 131 | |
| 132 | # |
| 133 | # Uses stream get contents to do the reading |
| 134 | # |
| 135 | # @param int $len How many bytes |
| 136 | # @return string Binary data |
| 137 | # |
| 138 | sub readAll |
| 139 | { |
| 140 | my $self = shift; |
| 141 | my $len = shift; |
| 142 | |
| 143 | |
T Jake Luciani | 0d73889 | 2008-12-23 03:12:50 +0000 | [diff] [blame] | 144 | return unless defined $self->{handle}; |
| 145 | |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 146 | my $pre = ""; |
| 147 | while (1) { |
| 148 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 149 | my $sock = $self->__wait(); |
| 150 | my $buf = $self->__recv($sock, $len); |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 151 | |
| 152 | if (!defined $buf || $buf eq '') { |
| 153 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 154 | die new Thrift::TException(ref($self).': Could not read '.$len.' bytes from '. |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 155 | $self->{host}.':'.$self->{port}); |
| 156 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 157 | } elsif ((my $sz = length($buf)) < $len) { |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 158 | |
| 159 | $pre .= $buf; |
| 160 | $len -= $sz; |
| 161 | |
| 162 | } else { |
| 163 | return $pre.$buf; |
| 164 | } |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | # |
| 169 | # Read from the socket |
| 170 | # |
| 171 | # @param int $len How many bytes |
| 172 | # @return string Binary data |
| 173 | # |
| 174 | sub read |
| 175 | { |
| 176 | my $self = shift; |
| 177 | my $len = shift; |
| 178 | |
T Jake Luciani | 0d73889 | 2008-12-23 03:12:50 +0000 | [diff] [blame] | 179 | return unless defined $self->{handle}; |
| 180 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 181 | my $sock = $self->__wait(); |
| 182 | my $buf = $self->__recv($sock, $len); |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 183 | |
| 184 | if (!defined $buf || $buf eq '') { |
| 185 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 186 | die new TException(ref($self).': Could not read '.$len.' bytes from '. |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 187 | $self->{host}.':'.$self->{port}); |
| 188 | |
| 189 | } |
| 190 | |
| 191 | return $buf; |
| 192 | } |
| 193 | |
| 194 | |
| 195 | # |
| 196 | # Write to the socket. |
| 197 | # |
| 198 | # @param string $buf The data to write |
| 199 | # |
| 200 | sub write |
| 201 | { |
| 202 | my $self = shift; |
| 203 | my $buf = shift; |
| 204 | |
T Jake Luciani | 0d73889 | 2008-12-23 03:12:50 +0000 | [diff] [blame] | 205 | return unless defined $self->{handle}; |
| 206 | |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 207 | while (length($buf) > 0) { |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 208 | #check for timeout |
T Jake Luciani | 93f8219 | 2009-11-12 02:36:30 +0000 | [diff] [blame] | 209 | my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 ); |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 210 | |
| 211 | if(@sockets == 0){ |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 212 | die new Thrift::TException(ref($self).': timed out writing to bytes from '. |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 213 | $self->{host}.':'.$self->{port}); |
| 214 | } |
| 215 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 216 | my $sent = $self->__send($sockets[0], $buf); |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 217 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 218 | if (!defined $sent || $sent == 0 ) { |
| 219 | |
| 220 | die new Thrift::TException(ref($self).': Could not write '.length($buf).' bytes '. |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 221 | $self->{host}.':'.$self->{host}); |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 222 | |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 223 | } |
| 224 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 225 | $buf = substr($buf, $sent); |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 226 | } |
| 227 | } |
| 228 | |
| 229 | # |
| 230 | # Flush output to the socket. |
| 231 | # |
| 232 | sub flush |
| 233 | { |
| 234 | my $self = shift; |
T Jake Luciani | 0d73889 | 2008-12-23 03:12:50 +0000 | [diff] [blame] | 235 | |
| 236 | return unless defined $self->{handle}; |
| 237 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 238 | my $ret = ($self->{handle}->handles())[0]->flush; |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 239 | } |
| 240 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 241 | ### |
| 242 | ### Overridable methods |
| 243 | ### |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 244 | |
| 245 | # |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 246 | # Open a connection to a server. |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 247 | # |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 248 | sub __open |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 249 | { |
| 250 | my $self = shift; |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 251 | return IO::Socket::INET->new(PeerAddr => $self->{host}, |
| 252 | PeerPort => $self->{port}, |
| 253 | Proto => 'tcp', |
| 254 | Timeout => $self->{sendTimeout} / 1000); |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 255 | } |
| 256 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 257 | # |
| 258 | # Close the connection |
| 259 | # |
| 260 | sub __close |
| 261 | { |
| 262 | my $self = shift; |
| 263 | CORE::close(($self->{handle}->handles())[0]); |
| 264 | } |
| 265 | |
| 266 | # |
| 267 | # Read data |
| 268 | # |
| 269 | # @param[in] $sock the socket |
| 270 | # @param[in] $len the length to read |
| 271 | # @returns the data buffer that was read |
| 272 | # |
| 273 | sub __recv |
| 274 | { |
| 275 | my $self = shift; |
| 276 | my $sock = shift; |
| 277 | my $len = shift; |
| 278 | my $buf = undef; |
| 279 | $sock->recv($buf, $len); |
| 280 | return $buf; |
| 281 | } |
| 282 | |
| 283 | # |
| 284 | # Send data |
| 285 | # |
| 286 | # @param[in] $sock the socket |
| 287 | # @param[in] $buf the data buffer |
| 288 | # @returns the number of bytes written |
| 289 | # |
| 290 | sub __send |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 291 | { |
| 292 | my $self = shift; |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 293 | my $sock = shift; |
| 294 | my $buf = shift; |
| 295 | return $sock->send($buf); |
| 296 | } |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 297 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 298 | # |
| 299 | # Wait for data to be readable |
| 300 | # |
| 301 | # @returns a socket that can be read |
| 302 | # |
| 303 | sub __wait |
| 304 | { |
| 305 | my $self = shift; |
| 306 | my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 ); |
| 307 | |
| 308 | if (@sockets == 0) { |
| 309 | die new Thrift::TException(ref($self).': timed out reading from '. |
| 310 | $self->{host}.':'.$self->{port}); |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 311 | } |
| 312 | |
Jim King | f5f1b35 | 2015-06-24 13:47:24 -0400 | [diff] [blame] | 313 | return $sockets[0]; |
T Jake Luciani | 0c5c234 | 2009-11-12 03:01:33 +0000 | [diff] [blame] | 314 | } |
| 315 | |
| 316 | |
Mark Slee | 254ce20 | 2007-05-16 02:21:06 +0000 | [diff] [blame] | 317 | 1; |