blob: 034956ce21f6fe190f67c151e52b3f27ddecab53 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
use 5.10.0;
use strict;
use warnings;
use Thrift;
use Thrift::Exception;
use Thrift::Transport;
use IO::Socket::INET;
use IO::Select;
package Thrift::Socket;
use base qw( Thrift::Transport );
use version 0.77; our $VERSION = version->declare("$Thrift::VERSION");
#
# Construction and usage
#
# my $opts = {}
# my $socket = Thrift::Socket->new(\%opts);
#
# options:
#
# host => host to connect to
# port => port to connect to
# sendTimeout => timeout used for send and for connect
# recvTimeout => timeout used for recv
#
sub new
{
my $classname = shift;
my $opts = shift;
# default settings:
my $self = {
host => 'localhost',
port => 9090,
recvTimeout => 10000,
sendTimeout => 10000,
proto => 'tcp',
handle => undef
};
if (defined $opts and ref $opts eq ref {}) {
# argument is a hash of options so override the defaults
$self->{$_} = $opts->{$_} for keys %$opts;
} else {
# older style constructor takes 3 arguments, none of which are required
$self->{host} = $opts || 'localhost';
$self->{port} = shift || 9090;
}
return bless($self,$classname);
}
sub setSendTimeout
{
my $self = shift;
my $timeout = shift;
$self->{sendTimeout} = $timeout;
}
sub setRecvTimeout
{
my $self = shift;
my $timeout = shift;
$self->{recvTimeout} = $timeout;
}
#
# Tests whether this is open
#
# @return bool true if the socket is open
#
sub isOpen
{
my $self = shift;
if( defined $self->{handle} ){
return ($self->{handle}->handles())[0]->connected;
}
return 0;
}
#
# Connects the socket.
#
sub open
{
my $self = shift;
my $sock = $self->__open() || do {
my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
die Thrift::TTransportException->new($error, Thrift::TTransportException::NOT_OPEN);
};
$self->{handle} = IO::Select->new( $sock );
}
#
# Closes the socket.
#
sub close
{
my $self = shift;
if( defined $self->{handle} ) {
$self->__close();
}
}
#
# Uses stream get contents to do the reading
#
# @param int $len How many bytes
# @return string Binary data
#
sub readAll
{
my $self = shift;
my $len = shift;
return unless defined $self->{handle};
my $pre = "";
while (1) {
my $sock = $self->__wait();
my $buf = $self->__recv($sock, $len);
if (!defined $buf || $buf eq '') {
die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
$self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
}
elsif ((my $sz = length($buf)) < $len) {
$pre .= $buf;
$len -= $sz;
}
else {
return $pre.$buf;
}
}
}
#
# Read from the socket
#
# @param int $len How many bytes
# @return string Binary data
#
sub read
{
my $self = shift;
my $len = shift;
return unless defined $self->{handle};
my $sock = $self->__wait();
my $buf = $self->__recv($sock, $len);
if (!defined $buf || $buf eq '') {
die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
$self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
}
return $buf;
}
#
# Write to the socket.
#
# @param string $buf The data to write
#
sub write
{
my $self = shift;
my $buf = shift;
return unless defined $self->{handle};
while (length($buf) > 0) {
#check for timeout
my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
if(@sockets == 0){
die Thrift::TTransportException->new(ref($self).': timed out writing to bytes from '.
$self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
}
my $sent = $self->__send($sockets[0], $buf);
if (!defined $sent || $sent == 0 ) {
die Thrift::TTransportException->new(ref($self).': Could not write '.length($buf).' bytes '.
$self->{host}.':'.$self->{host}, Thrift::TTransportException::END_OF_FILE);
}
$buf = substr($buf, $sent);
}
}
#
# Flush output to the socket.
#
sub flush
{
my $self = shift;
return unless defined $self->{handle};
my $ret = ($self->{handle}->handles())[0]->flush;
}
###
### Overridable methods
###
#
# Open a connection to a server.
#
sub __open
{
my $self = shift;
return IO::Socket::INET->new(PeerAddr => $self->{host},
PeerPort => $self->{port},
Proto => $self->{proto},
Timeout => $self->{sendTimeout} / 1000);
}
#
# Close the connection
#
sub __close
{
my $self = shift;
CORE::close(($self->{handle}->handles())[0]);
}
#
# Read data
#
# @param[in] $sock the socket
# @param[in] $len the length to read
# @returns the data buffer that was read
#
sub __recv
{
my $self = shift;
my $sock = shift;
my $len = shift;
my $buf = undef;
$sock->recv($buf, $len);
return $buf;
}
#
# Send data
#
# @param[in] $sock the socket
# @param[in] $buf the data buffer
# @returns the number of bytes written
#
sub __send
{
my $self = shift;
my $sock = shift;
my $buf = shift;
return $sock->send($buf);
}
#
# Wait for data to be readable
#
# @returns a socket that can be read
#
sub __wait
{
my $self = shift;
my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
if (@sockets == 0) {
die Thrift::TTransportException->new(ref($self).': timed out reading from '.
$self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
}
return $sockets[0];
}
1;