rb: Implement Thrift::UNIXSocket and Thrift::UNIXServerSocket

In benchmarking it turns out these don't give any noticeable performance boost,
but as I've already written them, somebody may want them for something.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669019 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/benchmark/fairness.rb b/lib/rb/benchmark/fairness.rb
index 02f1cec..67543f3 100644
--- a/lib/rb/benchmark/fairness.rb
+++ b/lib/rb/benchmark/fairness.rb
@@ -2,6 +2,7 @@
 $:.unshift File.dirname(__FILE__) + '/../lib'
 require 'thrift'
 require 'thrift/server/nonblockingserver'
+require 'thrift/transport/unixsocket'
 $:.unshift File.dirname(__FILE__) + "/gen-rb"
 require 'BenchmarkService'
 require 'thread'
@@ -29,10 +30,10 @@
     end
   end
 
-  def self.start_server(serverClass)
+  def self.start_server(serverClass, trans = nil)
     handler = BenchmarkHandler.new
     processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
-    transport = ServerSocket.new(HOST, PORT)
+    transport = trans || ServerSocket.new(HOST, PORT)
     transportFactory = FramedTransportFactory.new
     args = [processor, transport, transportFactory, nil, 20]
     if serverClass == NonblockingServer
@@ -60,24 +61,13 @@
   end
 end
 
-module Client
-  include Thrift
-
-  def self.start_client(&block)
-    transport = FramedTransport.new(Socket.new(HOST, PORT))
-    protocol = BinaryProtocol.new(transport)
-    client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
-    # transport.open
-    Thread.new do
-      block.call(client, transport)
-    end
-  end
-end
-
 class BenchmarkManager
   def initialize(opts)
-    @host = opts.fetch(:host, 'localhost')
-    @port = opts.fetch(:port)
+    @socket = opts.fetch(:socket) do
+      @host = opts.fetch(:host, 'localhost')
+      @port = opts.fetch(:port)
+      nil
+    end
     @num_processes = opts.fetch(:num_processes, 40)
     @clients_per_process = opts.fetch(:clients_per_process, 10)
     @calls_per_client = opts.fetch(:calls_per_client, 50)
@@ -104,7 +94,12 @@
       STDIN.close
       rd.close
       @clients_per_process.times do
-        transport = Thrift::FramedTransport.new(Thrift::Socket.new(@host, @port))
+        if @socket
+          socket = Thrift::UNIXSocket.new(@socket)
+        else
+          socket = Thrift::Socket.new(@host, @port)
+        end
+        transport = Thrift::FramedTransport.new(socket)
         protocol = Thrift::BinaryProtocol.new(transport)
         client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
         begin
@@ -128,6 +123,14 @@
     pid
   end
 
+  def socket_class
+    if @socket
+      Thrift::UNIXSocket
+    else
+      Thrift::Socket
+    end
+  end
+
   def collect_output
     puts "Collecting output..."
     # read from @pool until all sockets are closed
@@ -208,6 +211,7 @@
     puts
     tabulate "%d",
              [["Server class", "%s"], Server.class],
+             [["Socket class", "%s"], socket_class],
              ["Number of processes", @num_processes],
              ["Clients per process", @clients_per_process],
              ["Calls per client", @calls_per_client],
@@ -219,7 +223,7 @@
              ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
              ["Total time for all calls", @report[:total_calls]],
              ["Real time for benchmarking", @report[:total_benchmark_time]],
-             ["Longest call time", @report[:longest_call]],
+             ["Shortest call time", @report[:longest_call]],
              ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
   end
 
@@ -240,10 +244,20 @@
 
 puts "Starting server..."
 serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
-Server.start_server(serverklass)
+servertrans = nil
+if ENV['THRIFT_SOCKET']
+  servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
+end
+Server.start_server(serverklass, servertrans)
 
 sleep 0.2 # give the server time to start
 
-BenchmarkManager.new(:host => HOST, :port => PORT, :num_processes => 40, :clients_per_process => 5).run
+args = { :num_processes => 40, :clients_per_process => 5 }
+if ENV['THRIFT_SOCKET']
+  args[:socket] = ENV['THRIFT_SOCKET']
+else
+  args.merge!(:host => HOST, :port => PORT)
+end
+BenchmarkManager.new(args).run
 
 Server.shutdown
diff --git a/lib/rb/lib/thrift/transport/socket.rb b/lib/rb/lib/thrift/transport/socket.rb
index 7cf2f45..3d540e4 100644
--- a/lib/rb/lib/thrift/transport/socket.rb
+++ b/lib/rb/lib/thrift/transport/socket.rb
@@ -14,6 +14,7 @@
     def initialize(host='localhost', port=9090)
       @host = host
       @port = port
+      @desc = "#{host}:#{port}"
       @handle = nil
     end
 
@@ -23,7 +24,7 @@
       begin
         @handle = TCPSocket.new(@host, @port)
       rescue StandardError
-        raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@host}:#{@port}")
+        raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}")
       end
     end
 
@@ -57,7 +58,7 @@
         raise TransportException.new(TransportException::NOT_OPEN, e.message)
       end
       if (data.nil? or data.length == 0)
-        raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@host}:#{@port}")
+        raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
       end
       data
     end
diff --git a/lib/rb/lib/thrift/transport/unixsocket.rb b/lib/rb/lib/thrift/transport/unixsocket.rb
new file mode 100644
index 0000000..af686ca
--- /dev/null
+++ b/lib/rb/lib/thrift/transport/unixsocket.rb
@@ -0,0 +1,51 @@
+require 'thrift/transport'
+require 'socket'
+
+module Thrift
+  class UNIXSocket < Socket
+    def initialize(path)
+      @path = path
+      @desc = @path # for read()'s error
+      @handle = nil
+    end
+
+    def open
+      begin
+        @handle = ::UNIXSocket.new(@path)
+      rescue StandardError
+        raise TransportException.new(TransportException::NOT_OPEN, "Could not open UNIX socket at #{@path}")
+      end
+    end
+  end
+
+  class UNIXServerSocket < ServerTransport
+    def initialize(path)
+      @path = path
+      @handle = nil
+    end
+
+    attr_accessor :handle
+
+    def listen
+      @handle = ::UNIXServer.new(@path)
+    end
+
+    def accept
+      unless @handle.nil?
+        sock = @handle.accept
+        trans = UNIXSocket.new(nil)
+        trans.handle = sock
+        trans
+      end
+    end
+
+    def close
+      if @handle
+        @handle.close unless @handle.closed?
+        @handle = nil
+        # UNIXServer doesn't delete the socket file, so we have to do it ourselves
+        File.delete(@path)
+      end
+    end
+  end
+end
diff --git a/lib/rb/spec/socket_spec.rb b/lib/rb/spec/socket_spec.rb
index 4b6dae5..ed2f56a 100644
--- a/lib/rb/spec/socket_spec.rb
+++ b/lib/rb/spec/socket_spec.rb
@@ -1,81 +1,33 @@
 require File.dirname(__FILE__) + '/spec_helper'
+require File.dirname(__FILE__) + "/socket_spec_shared"
 
 class ThriftSocketSpec < Spec::ExampleGroup
   include Thrift
 
-  before(:each) do
-    @socket = Socket.new
-    @handle = mock("Handle", :closed? => false)
-    @handle.stub!(:close)
-  end
-
   describe Socket do
-    it "should open a TCPSocket" do
-      TCPSocket.should_receive(:new).with('localhost', 9090).and_return(@handle)
-      @socket.open.should == @handle
+    before(:each) do
+      @socket = Socket.new
+      @handle = mock("Handle", :closed? => false)
+      @handle.stub!(:close)
+      TCPSocket.stub!(:new).and_return(@handle)
+    end
+
+    it_should_behave_like "a socket"
+
+    it "should raise a TransportException when it cannot open a socket" do
+      TCPSocket.should_receive(:new).and_raise(StandardError)
+      lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+    end
+
+    it "should open a TCPSocket with default args" do
+      TCPSocket.should_receive(:new).with('localhost', 9090)
+      @socket.open
     end
 
     it "should accept host/port options" do
       TCPSocket.should_receive(:new).with('my.domain', 1234)
       Socket.new('my.domain', 1234).open
     end
-
-    it "should raise a TransportException when it cannot open a socket" do
-      TCPSocket.should_receive(:new).with('localhost', 9090).and_raise(StandardError)
-      lambda { @socket.open }.should raise_error(TransportException, "Could not connect to localhost:9090") { |e| e.type.should == TransportException::NOT_OPEN }
-    end
-
-    it "should be open whenever it has a handle" do
-      @socket.should_not be_open
-      TCPSocket.should_receive(:new).and_return(@handle)
-      @socket.open
-      @socket.should be_open
-      @socket.handle = nil
-      @socket.should_not be_open
-      @socket.handle = @handle
-      @handle.should_receive(:close)
-      @socket.close
-      @socket.should_not be_open
-    end
-
-    it "should write data to the handle" do
-      TCPSocket.should_receive(:new).and_return(@handle)
-      @socket.open
-      @handle.should_receive(:write).with("foobar")
-      @socket.write("foobar")
-      @handle.should_receive(:write).with("fail").and_raise(StandardError)
-      lambda { @socket.write("fail") }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN }
-    end
-
-    it "should raise an error when it cannot read from the handle" do
-      TCPSocket.should_receive(:new).and_return(@handle)
-      @socket.open
-      @handle.should_receive(:read).with(17).and_raise(StandardError)
-      lambda { @socket.read(17) }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN }
-    end
-
-    it "should raise an error when it reads no data from the handle" do
-      TCPSocket.should_receive(:new).and_return(@handle)
-      @socket.open
-      @handle.should_receive(:read).with(17).and_return("")
-      lambda { @socket.read(17) }.should raise_error(TransportException, "Socket: Could not read 17 bytes from localhost:9090")
-    end
-
-    it "should return the data read when reading from the handle works" do
-      TCPSocket.should_receive(:new).and_return(@handle)
-      @socket.open
-      @handle.should_receive(:read).with(17).and_return("test data")
-      @socket.read(17).should == "test data"
-    end
-
-    it "should declare itself as closed when it has an error" do
-      TCPSocket.should_receive(:new).and_return(@handle)
-      @socket.open
-      @handle.should_receive(:write).with("fail").and_raise(StandardError)
-      @socket.should be_open
-      lambda { @socket.write("fail") }.should raise_error
-      @socket.should_not be_open
-    end
   end
 
   describe ServerSocket do
@@ -84,8 +36,8 @@
     end
 
     it "should create a handle when calling listen" do
+      TCPServer.should_receive(:new).with(nil, 1234)
       @socket.listen
-      @socket.handle.should be_an_instance_of(TCPServer)
     end
 
     it "should accept an optional host argument" do
diff --git a/lib/rb/spec/socket_spec_shared.rb b/lib/rb/spec/socket_spec_shared.rb
new file mode 100644
index 0000000..448a516
--- /dev/null
+++ b/lib/rb/spec/socket_spec_shared.rb
@@ -0,0 +1,52 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+shared_examples_for "a socket" do
+  it "should open a socket" do
+    @socket.open.should == @handle
+  end
+
+  it "should be open whenever it has a handle" do
+    @socket.should_not be_open
+    @socket.open
+    @socket.should be_open
+    @socket.handle = nil
+    @socket.should_not be_open
+    @socket.handle = @handle
+    @socket.close
+    @socket.should_not be_open
+  end
+
+  it "should write data to the handle" do
+    @socket.open
+    @handle.should_receive(:write).with("foobar")
+    @socket.write("foobar")
+    @handle.should_receive(:write).with("fail").and_raise(StandardError)
+    lambda { @socket.write("fail") }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+  end
+
+  it "should raise an error when it cannot read from the handle" do
+    @socket.open
+    @handle.should_receive(:read).with(17).and_raise(StandardError)
+    lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+  end
+
+  it "should raise an error when it reads no data from the handle" do
+    @socket.open
+    @handle.should_receive(:read).with(17).and_return("")
+    lambda { @socket.read(17) }.should raise_error(Thrift::TransportException, "Socket: Could not read 17 bytes from #{@socket.instance_variable_get("@desc")}")
+  end
+
+  it "should return the data read when reading from the handle works" do
+    @socket.open
+    @handle.should_receive(:read).with(17).and_return("test data")
+    @socket.read(17).should == "test data"
+  end
+
+  it "should declare itself as closed when it has an error" do
+    @socket.open
+    @handle.should_receive(:write).with("fail").and_raise(StandardError)
+    @socket.should be_open
+    lambda { @socket.write("fail") }.should raise_error
+    @socket.should_not be_open
+  end
+end
diff --git a/lib/rb/spec/unixsocket_spec.rb b/lib/rb/spec/unixsocket_spec.rb
new file mode 100644
index 0000000..777ef04
--- /dev/null
+++ b/lib/rb/spec/unixsocket_spec.rb
@@ -0,0 +1,60 @@
+require File.dirname(__FILE__) + '/spec_helper'
+require 'thrift/transport/unixsocket'
+require File.dirname(__FILE__) + "/socket_spec_shared"
+
+class ThriftUNIXSocketSpec < Spec::ExampleGroup
+  include Thrift
+
+  describe UNIXSocket do
+    before(:each) do
+      @path = '/tmp/thrift_spec_socket'
+      @socket = UNIXSocket.new(@path)
+      @handle = mock("Handle", :closed? => false)
+      @handle.stub!(:close)
+      ::UNIXSocket.stub!(:new).and_return(@handle)
+    end
+
+    it_should_behave_like "a socket"
+
+    it "should raise a TransportException when it cannot open a socket" do
+      ::UNIXSocket.should_receive(:new).and_raise(StandardError)
+      lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+    end
+  end
+
+  describe UNIXServerSocket do
+    before(:each) do
+      @path = '/tmp/thrift_spec_socket'
+      @socket = UNIXServerSocket.new(@path)
+    end
+
+    it "should create a handle when calling listen" do
+      UNIXServer.should_receive(:new).with(@path)
+      @socket.listen
+    end
+
+    it "should create a Thrift::UNIXSocket to wrap accepted sockets" do
+      handle = mock("UNIXServer")
+      UNIXServer.should_receive(:new).with(@path).and_return(handle)
+      @socket.listen
+      sock = mock("sock")
+      handle.should_receive(:accept).and_return(sock)
+      trans = mock("UNIXSocket")
+      UNIXSocket.should_receive(:new).and_return(trans)
+      trans.should_receive(:handle=).with(sock)
+      @socket.accept.should == trans
+    end
+
+    it "should close the handle when closed" do
+      handle = mock("UNIXServer", :closed? => false)
+      UNIXServer.should_receive(:new).with(@path).and_return(handle)
+      @socket.listen
+      handle.should_receive(:close)
+      @socket.close
+    end
+
+    it "should return nil when accepting if there is no handle" do
+      @socket.accept.should be_nil
+    end
+  end
+end