| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| require 5.6.0; |
| use strict; |
| use warnings; |
| |
| use Thrift; |
| use Thrift::Transport; |
| |
| use IO::Socket::INET; |
| use IO::Select; |
| |
| package Thrift::Socket; |
| |
| use base('Thrift::Transport'); |
| |
| sub new |
| { |
| my $classname = shift; |
| my $host = shift || "localhost"; |
| my $port = shift || 9090; |
| my $debugHandler = shift; |
| |
| my $self = { |
| host => $host, |
| port => $port, |
| debugHandler => $debugHandler, |
| debug => 0, |
| sendTimeout => 10000, |
| recvTimeout => 10000, |
| handle => undef, |
| }; |
| |
| return bless($self,$classname); |
| } |
| |
| |
| sub setSendTimeout |
| { |
| my $self = shift; |
| my $timeout = shift; |
| |
| $self->{sendTimeout} = $timeout; |
| } |
| |
| sub setRecvTimeout |
| { |
| my $self = shift; |
| my $timeout = shift; |
| |
| $self->{recvTimeout} = $timeout; |
| } |
| |
| |
| # |
| #Sets debugging output on or off |
| # |
| # @param bool $debug |
| # |
| sub setDebug |
| { |
| my $self = shift; |
| my $debug = shift; |
| |
| $self->{debug} = $debug; |
| } |
| |
| # |
| # Tests whether this is open |
| # |
| # @return bool true if the socket is open |
| # |
| sub isOpen |
| { |
| my $self = shift; |
| |
| if( defined $self->{handle} ){ |
| return ($self->{handle}->handles())[0]->connected; |
| } |
| |
| return 0; |
| } |
| |
| # |
| # Connects the socket. |
| # |
| sub open |
| { |
| my $self = shift; |
| |
| my $sock = IO::Socket::INET->new(PeerAddr => $self->{host}, |
| PeerPort => $self->{port}, |
| Proto => 'tcp', |
| Timeout => $self->{sendTimeout}/1000) |
| || do { |
| my $error = 'TSocket: Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')'; |
| |
| if ($self->{debug}) { |
| $self->{debugHandler}->($error); |
| } |
| |
| die new Thrift::TException($error); |
| |
| }; |
| |
| |
| $self->{handle} = new IO::Select( $sock ); |
| } |
| |
| # |
| # Closes the socket. |
| # |
| sub close |
| { |
| my $self = shift; |
| |
| if( defined $self->{handle} ){ |
| CORE::close( ($self->{handle}->handles())[0] ); |
| } |
| } |
| |
| # |
| # Uses stream get contents to do the reading |
| # |
| # @param int $len How many bytes |
| # @return string Binary data |
| # |
| sub readAll |
| { |
| my $self = shift; |
| my $len = shift; |
| |
| |
| return unless defined $self->{handle}; |
| |
| my $pre = ""; |
| while (1) { |
| |
| #check for timeout |
| my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 ); |
| |
| if(@sockets == 0){ |
| die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '. |
| $self->{host}.':'.$self->{port}); |
| } |
| |
| my $sock = $sockets[0]; |
| |
| my ($buf,$sz); |
| $sock->recv($buf, $len); |
| |
| if (!defined $buf || $buf eq '') { |
| |
| die new Thrift::TException('TSocket: Could not read '.$len.' bytes from '. |
| $self->{host}.':'.$self->{port}); |
| |
| } elsif (($sz = length($buf)) < $len) { |
| |
| $pre .= $buf; |
| $len -= $sz; |
| |
| } else { |
| return $pre.$buf; |
| } |
| } |
| } |
| |
| # |
| # Read from the socket |
| # |
| # @param int $len How many bytes |
| # @return string Binary data |
| # |
| sub read |
| { |
| my $self = shift; |
| my $len = shift; |
| |
| return unless defined $self->{handle}; |
| |
| #check for timeout |
| my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 ); |
| |
| if(@sockets == 0){ |
| die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '. |
| $self->{host}.':'.$self->{port}); |
| } |
| |
| my $sock = $sockets[0]; |
| |
| my ($buf,$sz); |
| $sock->recv($buf, $len); |
| |
| if (!defined $buf || $buf eq '') { |
| |
| die new TException('TSocket: Could not read '.$len.' bytes from '. |
| $self->{host}.':'.$self->{port}); |
| |
| } |
| |
| return $buf; |
| } |
| |
| |
| # |
| # Write to the socket. |
| # |
| # @param string $buf The data to write |
| # |
| sub write |
| { |
| my $self = shift; |
| my $buf = shift; |
| |
| |
| return unless defined $self->{handle}; |
| |
| while (length($buf) > 0) { |
| |
| |
| #check for timeout |
| my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 ); |
| |
| if(@sockets == 0){ |
| die new Thrift::TException('TSocket: timed out writing to bytes from '. |
| $self->{host}.':'.$self->{port}); |
| } |
| |
| my $sock = $sockets[0]; |
| |
| my $got = $sock->send($buf); |
| |
| if (!defined $got || $got == 0 ) { |
| die new Thrift::TException('TSocket: Could not write '.length($buf).' bytes '. |
| $self->{host}.':'.$self->{host}); |
| } |
| |
| $buf = substr($buf, $got); |
| } |
| } |
| |
| # |
| # Flush output to the socket. |
| # |
| sub flush |
| { |
| my $self = shift; |
| |
| return unless defined $self->{handle}; |
| |
| my $ret = ($self->{handle}->handles())[0]->flush; |
| } |
| |
| |
| # |
| # Build a ServerSocket from the ServerTransport base class |
| # |
| package Thrift::ServerSocket; |
| |
| use base qw( Thrift::Socket Thrift::ServerTransport ); |
| |
| use constant LISTEN_QUEUE_SIZE => 128; |
| |
| sub new |
| { |
| my $classname = shift; |
| my $port = shift; |
| |
| my $self = $classname->SUPER::new(undef, $port, undef); |
| return bless($self,$classname); |
| } |
| |
| sub listen |
| { |
| my $self = shift; |
| |
| # Listen to a new socket |
| my $sock = IO::Socket::INET->new(LocalAddr => undef, # any addr |
| LocalPort => $self->{port}, |
| Proto => 'tcp', |
| Listen => LISTEN_QUEUE_SIZE, |
| ReuseAddr => 1) |
| || do { |
| my $error = 'TServerSocket: Could not bind to ' . |
| $self->{host} . ':' . $self->{port} . ' (' . $! . ')'; |
| |
| if ($self->{debug}) { |
| $self->{debugHandler}->($error); |
| } |
| |
| die new Thrift::TException($error); |
| }; |
| |
| $self->{handle} = $sock; |
| } |
| |
| sub accept |
| { |
| my $self = shift; |
| |
| if ( exists $self->{handle} and defined $self->{handle} ) |
| { |
| my $client = $self->{handle}->accept(); |
| my $result = new Thrift::Socket; |
| $result->{handle} = new IO::Select($client); |
| return $result; |
| } |
| |
| return 0; |
| } |
| |
| |
| 1; |