THRIFT-619: Perl server and example
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@835206 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/perl/Makefile.am b/lib/perl/Makefile.am
index 163d015..eb19560 100644
--- a/lib/perl/Makefile.am
+++ b/lib/perl/Makefile.am
@@ -50,5 +50,6 @@
lib/Thrift/HttpClient.pm \
lib/Thrift/MemoryBuffer.pm \
lib/Thrift/Protocol.pm \
+ lib/Thrift/Server.pm \
lib/Thrift/Socket.pm \
lib/Thrift/Transport.pm
diff --git a/lib/perl/lib/Thrift/BufferedTransport.pm b/lib/perl/lib/Thrift/BufferedTransport.pm
index bef564d..3868ca2 100644
--- a/lib/perl/lib/Thrift/BufferedTransport.pm
+++ b/lib/perl/lib/Thrift/BufferedTransport.pm
@@ -106,4 +106,31 @@
}
+#
+# BufferedTransport factory creates buffered transport objects from transports
+#
+package Thrift::BufferedTransportFactory;
+
+sub new {
+ my $classname = shift;
+ my $self = {};
+
+ return bless($self,$classname);
+}
+
+#
+# Build a buffered transport from the base transport
+#
+# @return Thrift::BufferedTransport transport
+#
+sub getTransport
+{
+ my $self = shift;
+ my $trans = shift;
+
+ my $buffered = Thrift::BufferedTransport->new($trans);
+ return $buffered;
+}
+
+
1;
diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm
new file mode 100644
index 0000000..960fbd1
--- /dev/null
+++ b/lib/perl/lib/Thrift/Server.pm
@@ -0,0 +1,313 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Thrift;
+use Thrift::BufferedTransport;
+use Thrift::BinaryProtocol;
+
+#
+# Server base class module
+#
+package Thrift::Server;
+
+# 3 possible constructors:
+# 1. (processor, serverTransport)
+# 2. (processor, serverTransport, transportFactory, protocolFactory)
+# 3. (processor, serverTransport,
+# inputTransportFactory, outputTransportFactory,
+# inputProtocolFactory, outputProtocolFactory)
+sub new
+{
+ my $classname = shift;
+ my @args = @_;
+
+ my $self;
+
+ if (scalar @args == 2)
+ {
+ $self = _init($args[0], $args[1],
+ Thrift::BufferedTransportFactory->new(),
+ Thrift::BufferedTransportFactory->new(),
+ Thrift::BinaryProtocolFactory->new(),
+ Thrift::BinaryProtocolFactory->new());
+ }
+ elsif (scalar @args == 4)
+ {
+ $self = _init($args[0], $args[1], $args[2], $args[2], $args[3], $args[3]);
+ }
+ elsif (scalar @args == 6)
+ {
+ $self = _init($args[0], $args[1], $args[2], $args[3], $args[4], $args[5]);
+ }
+ else
+ {
+ die "Thrift::Server expects exactly 2, 4, or 6 args";
+ }
+
+ return bless($self,$classname);
+}
+
+sub _init
+{
+ my $processor = shift;
+ my $serverTransport = shift;
+ my $inputTransportFactory = shift;
+ my $outputTransportFactory = shift;
+ my $inputProtocolFactory = shift;
+ my $outputProtocolFactory = shift;
+
+ my $self = {
+ processor => $processor,
+ serverTransport => $serverTransport,
+ inputTransportFactory => $inputTransportFactory,
+ outputTransportFactory => $outputTransportFactory,
+ inputProtocolFactory => $inputProtocolFactory,
+ outputProtocolFactory => $outputProtocolFactory,
+ };
+}
+
+sub serve
+{
+ die "abstract";
+}
+
+sub _clientBegin
+{
+ my $self = shift;
+ my $iprot = shift;
+ my $oprot = shift;
+
+ if (exists $self->{serverEventHandler} and
+ defined $self->{serverEventHandler})
+ {
+ $self->{serverEventHandler}->clientBegin($iprot, $oprot);
+ }
+}
+
+sub _handleException
+{
+ my $self = shift;
+ my $e = shift;
+
+ if ($e =~ m/TException/ and exists $e->{message}) {
+ my $message = $e->{message};
+ my $code = $e->{code};
+ my $out = $code . ':' . $message;
+
+ $message =~ m/TTransportException/ and die $out;
+ if ($message =~ m/TSocket/) {
+ # suppress TSocket messages
+ } else {
+ warn $out;
+ }
+ } else {
+ warn $e;
+ }
+}
+
+
+#
+# SimpleServer from the Server base class that handles one connection at a time
+#
+package Thrift::SimpleServer;
+use base qw( Thrift::Server );
+
+sub new
+{
+ my $classname = shift;
+ my @args = @_;
+
+ my $self = $classname->SUPER::new(@args);
+ return bless($self,$classname);
+}
+
+sub serve
+{
+ my $self = shift;
+
+ $self->{serverTransport}->listen();
+ while (1)
+ {
+ my $client = $self->{serverTransport}->accept();
+ my $itrans = $self->{inputTransportFactory}->getTransport($client);
+ my $otrans = $self->{outputTransportFactory}->getTransport($client);
+ my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans);
+ my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans);
+ eval {
+ $self->_clientBegin($iprot, $oprot);
+ while (1)
+ {
+ $self->{processor}->process($iprot, $oprot);
+ }
+ }; if($@) {
+ $self->_handleException($@);
+ }
+
+ $itrans->close();
+ $otrans->close();
+ }
+}
+
+
+#
+# ForkingServer that forks a new process for each request
+#
+package Thrift::ForkingServer;
+use base qw( Thrift::Server );
+
+use POSIX ":sys_wait_h";
+
+sub new
+{
+ my $classname = shift;
+ my @args = @_;
+
+ my $self = $classname->SUPER::new(@args);
+ return bless($self,$classname);
+}
+
+
+sub serve
+{
+ my $self = shift;
+
+ $self->{serverTransport}->listen();
+ while (1)
+ {
+ my $client = $self->{serverTransport}->accept();
+ $self->_client($client);
+ }
+}
+
+sub _client
+{
+ my $self = shift;
+ my $client = shift;
+
+ eval {
+ my $itrans = $self->{inputTransportFactory}->getTransport($client);
+ my $otrans = $self->{outputTransportFactory}->getTransport($client);
+
+ my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans);
+ my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans);
+
+ $self->_clientBegin($iprot, $oprot);
+
+ my $pid = fork();
+
+ if ($pid) #parent
+ {
+ $self->_parent($pid, $itrans, $otrans);
+ } else {
+ $self->_child($itrans, $otrans, $iprot, $oprot);
+ }
+ }; if($@) {
+ $self->_handleException($@);
+ }
+}
+
+sub _parent
+{
+ my $self = shift;
+ my $pid = shift;
+ my $itrans = shift;
+ my $otrans = shift;
+
+ # add before collect, otherwise you race w/ waitpid
+ $self->{children}->{$pid} = 1;
+ $self->_collectChildren();
+
+ # Parent must close socket or the connection may not get closed promptly
+ $self->tryClose($itrans);
+ $self->tryClose($otrans);
+}
+
+sub _child
+{
+ my $self = shift;
+ my $itrans = shift;
+ my $otrans = shift;
+ my $iprot = shift;
+ my $oprot = shift;
+
+ my $ecode = 0;
+ eval {
+ while (1)
+ {
+ $self->{processor}->process($iprot, $oprot);
+ }
+ }; if($@) {
+ $ecode = 1;
+ $self->_handleException($@);
+ }
+
+ $self->tryClose($itrans);
+ $self->tryClose($otrans);
+
+ exit($ecode);
+}
+
+sub tryClose
+{
+ my $self = shift;
+ my $file = shift;
+
+ eval {
+ if (defined $file)
+ {
+ $file->close();
+ }
+ }; if($@) {
+ if ($@ =~ m/TException/ and exists $@->{message}) {
+ my $message = $@->{message};
+ my $code = $@->{code};
+ my $out = $code . ':' . $message;
+
+ warn $out;
+ } else {
+ warn $@;
+ }
+ }
+}
+
+sub _collectChildren
+{
+ my $self = shift;
+
+ while (scalar keys %{$self->{children}})
+ {
+ my $pid = waitpid(-1, WNOHANG);
+
+ if ($pid>0)
+ {
+ delete $self->{children}->{$pid};
+ }
+ else
+ {
+ last;
+ }
+ }
+}
+
+
+1;
diff --git a/lib/perl/lib/Thrift/Socket.pm b/lib/perl/lib/Thrift/Socket.pm
index 4d2aac7..7ebea35 100644
--- a/lib/perl/lib/Thrift/Socket.pm
+++ b/lib/perl/lib/Thrift/Socket.pm
@@ -33,9 +33,9 @@
sub new
{
- my $classname = shift;
- my $host = shift || "localhost";
- my $port = shift || 9090;
+ my $classname = shift;
+ my $host = shift || "localhost";
+ my $port = shift || 9090;
my $debugHandler = shift;
my $self = {
@@ -132,7 +132,7 @@
my $self = shift;
if( defined $self->{handle} ){
- close( ($self->{handle}->handles())[0] );
+ CORE::close( ($self->{handle}->handles())[0] );
}
}
@@ -268,4 +268,63 @@
my $ret = ($self->{handle}->handles())[0]->flush;
}
+
+#
+# Build a ServerSocket from the ServerTransport base class
+#
+package Thrift::ServerSocket;
+
+use base qw( Thrift::Socket Thrift::ServerTransport );
+
+use constant LISTEN_QUEUE_SIZE => 128;
+
+sub new
+{
+ my $classname = shift;
+ my $port = shift;
+
+ my $self = $classname->SUPER::new(undef, $port, undef);
+ return bless($self,$classname);
+}
+
+sub listen
+{
+ my $self = shift;
+
+ # Listen to a new socket
+ my $sock = IO::Socket::INET->new(LocalAddr => undef, # any addr
+ LocalPort => $self->{port},
+ Proto => 'tcp',
+ Listen => LISTEN_QUEUE_SIZE,
+ ReuseAddr => 1)
+ || do {
+ my $error = 'TServerSocket: Could not bind to ' .
+ $self->{host} . ':' . $self->{port} . ' (' . $! . ')';
+
+ if ($self->{debug}) {
+ $self->{debugHandler}->($error);
+ }
+
+ die new Thrift::TException($error);
+ };
+
+ $self->{handle} = $sock;
+}
+
+sub accept
+{
+ my $self = shift;
+
+ if ( exists $self->{handle} and defined $self->{handle} )
+ {
+ my $client = $self->{handle}->accept();
+ my $result = new Thrift::Socket;
+ $result->{handle} = new IO::Select($client);
+ return $result;
+ }
+
+ return 0;
+}
+
+
1;
diff --git a/lib/perl/lib/Thrift/Transport.pm b/lib/perl/lib/Thrift/Transport.pm
index e22592b..5ec6fee 100644
--- a/lib/perl/lib/Thrift/Transport.pm
+++ b/lib/perl/lib/Thrift/Transport.pm
@@ -125,5 +125,53 @@
#
sub flush {}
+
+#
+# TransportFactory creates transport objects from transports
+#
+package Thrift::TransportFactory;
+
+sub new {
+ my $classname = shift;
+ my $self = {};
+
+ return bless($self,$classname);
+}
+
+#
+# Build a transport from the base transport
+#
+# @return Thrift::Transport transport
+#
+sub getTransport
+{
+ my $self = shift;
+ my $trans = shift;
+
+ return $trans;
+}
+
+
+#
+# ServerTransport base class module
+#
+package Thrift::ServerTransport;
+
+sub listen
+{
+ die "abstract";
+}
+
+sub accept
+{
+ die "abstract";
+}
+
+sub close
+{
+ die "abstract";
+}
+
+
1;
diff --git a/tutorial/perl/PerlServer.pl b/tutorial/perl/PerlServer.pl
new file mode 100644
index 0000000..a40ec69
--- /dev/null
+++ b/tutorial/perl/PerlServer.pl
@@ -0,0 +1,124 @@
+#!/usr/bin/perl
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use strict;
+use lib '../gen-perl';
+use Thrift::Socket;
+use Thrift::Server;
+use tutorial::Calculator;
+
+package CalculatorHandler;
+use base qw(tutorial::CalculatorIf);
+
+sub new {
+ my $classname = shift;
+ my $self = {};
+
+ return bless($self,$classname);
+}
+
+
+sub ping
+{
+ print "ping()\n";
+}
+
+sub add
+{
+ my($self, $n1, $n2) = @_;
+ printf("add(%d,%d)\n", $n1, $n2);
+ return $n1 + $n2;
+}
+
+sub calculate
+{
+ my($self, $logid, $work) = @_;
+ my $op = $work->{op};
+ my $num1 = $work->{num1};
+ my $num2 = $work->{num2};
+ printf("calculate(%d, %d %d %d)\n", $logid, $num1, $num2, $op);
+
+ my $val;
+
+ if ($op == tutorial::Operation::ADD) {
+ $val = $num1 + $num2;
+ } elsif ($op == tutorial::Operation::SUBTRACT) {
+ $val = $num1 - $num2;
+ } elsif ($op == tutorial::Operation::MULTIPLY) {
+ $val = $num1 * $num2;
+ } elsif ($op == tutorial::Operation::DIVIDE) {
+ if ($num2 == 0)
+ {
+ my $x = new tutorial::InvalidOperation;
+ $x->what($op);
+ $x->why('Cannot divide by 0');
+ die $x;
+ }
+ $val = $num1 / $num2;
+ } else {
+ my $x = new tutorial::InvalidOperation;
+ $x->what($op);
+ $x->why('Invalid operation');
+ die $x;
+ }
+
+ my $log = new shared::SharedStruct;
+ $log->key($logid);
+ $log->value(int($val));
+ $self->{log}->{$logid} = $log;
+
+ return $val;
+}
+
+sub getStruct
+{
+ my($self, $key) = @_;
+ printf("getStruct(%d)\n", $key);
+ return $self->{log}->{$key};
+}
+
+sub zip
+{
+ my($self) = @_;
+ print "zip()\n";
+}
+
+
+
+eval {
+ my $handler = new CalculatorHandler;
+ my $processor = new tutorial::CalculatorProcessor($handler);
+ my $serversocket = new Thrift::ServerSocket(9090);
+ my $forkingserver = new Thrift::ForkingServer($processor, $serversocket);
+ print "Starting the server...\n";
+ $forkingserver->serve();
+ print "done.\n";
+}; if ($@) {
+ if ($@ =~ m/TException/ and exists $@->{message}) {
+ my $message = $@->{message};
+ my $code = $@->{code};
+ my $out = $code . ':' . $message;
+ die $out;
+ } else {
+ die $@;
+ }
+}
+