rb: Implement Thrift::UNIXSocket and Thrift::UNIXServerSocket
In benchmarking it turns out these don't give any noticeable performance boost,
but as I've already written them, somebody may want them for something.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669019 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/benchmark/fairness.rb b/lib/rb/benchmark/fairness.rb
index 02f1cec..67543f3 100644
--- a/lib/rb/benchmark/fairness.rb
+++ b/lib/rb/benchmark/fairness.rb
@@ -2,6 +2,7 @@
$:.unshift File.dirname(__FILE__) + '/../lib'
require 'thrift'
require 'thrift/server/nonblockingserver'
+require 'thrift/transport/unixsocket'
$:.unshift File.dirname(__FILE__) + "/gen-rb"
require 'BenchmarkService'
require 'thread'
@@ -29,10 +30,10 @@
end
end
- def self.start_server(serverClass)
+ def self.start_server(serverClass, trans = nil)
handler = BenchmarkHandler.new
processor = ThriftBenchmark::BenchmarkService::Processor.new(handler)
- transport = ServerSocket.new(HOST, PORT)
+ transport = trans || ServerSocket.new(HOST, PORT)
transportFactory = FramedTransportFactory.new
args = [processor, transport, transportFactory, nil, 20]
if serverClass == NonblockingServer
@@ -60,24 +61,13 @@
end
end
-module Client
- include Thrift
-
- def self.start_client(&block)
- transport = FramedTransport.new(Socket.new(HOST, PORT))
- protocol = BinaryProtocol.new(transport)
- client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
- # transport.open
- Thread.new do
- block.call(client, transport)
- end
- end
-end
-
class BenchmarkManager
def initialize(opts)
- @host = opts.fetch(:host, 'localhost')
- @port = opts.fetch(:port)
+ @socket = opts.fetch(:socket) do
+ @host = opts.fetch(:host, 'localhost')
+ @port = opts.fetch(:port)
+ nil
+ end
@num_processes = opts.fetch(:num_processes, 40)
@clients_per_process = opts.fetch(:clients_per_process, 10)
@calls_per_client = opts.fetch(:calls_per_client, 50)
@@ -104,7 +94,12 @@
STDIN.close
rd.close
@clients_per_process.times do
- transport = Thrift::FramedTransport.new(Thrift::Socket.new(@host, @port))
+ if @socket
+ socket = Thrift::UNIXSocket.new(@socket)
+ else
+ socket = Thrift::Socket.new(@host, @port)
+ end
+ transport = Thrift::FramedTransport.new(socket)
protocol = Thrift::BinaryProtocol.new(transport)
client = ThriftBenchmark::BenchmarkService::Client.new(protocol)
begin
@@ -128,6 +123,14 @@
pid
end
+ def socket_class
+ if @socket
+ Thrift::UNIXSocket
+ else
+ Thrift::Socket
+ end
+ end
+
def collect_output
puts "Collecting output..."
# read from @pool until all sockets are closed
@@ -208,6 +211,7 @@
puts
tabulate "%d",
[["Server class", "%s"], Server.class],
+ [["Socket class", "%s"], socket_class],
["Number of processes", @num_processes],
["Clients per process", @clients_per_process],
["Calls per client", @calls_per_client],
@@ -219,7 +223,7 @@
["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]],
["Total time for all calls", @report[:total_calls]],
["Real time for benchmarking", @report[:total_benchmark_time]],
- ["Longest call time", @report[:longest_call]],
+ ["Shortest call time", @report[:longest_call]],
["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]]
end
@@ -240,10 +244,20 @@
puts "Starting server..."
serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer
-Server.start_server(serverklass)
+servertrans = nil
+if ENV['THRIFT_SOCKET']
+ servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET'])
+end
+Server.start_server(serverklass, servertrans)
sleep 0.2 # give the server time to start
-BenchmarkManager.new(:host => HOST, :port => PORT, :num_processes => 40, :clients_per_process => 5).run
+args = { :num_processes => 40, :clients_per_process => 5 }
+if ENV['THRIFT_SOCKET']
+ args[:socket] = ENV['THRIFT_SOCKET']
+else
+ args.merge!(:host => HOST, :port => PORT)
+end
+BenchmarkManager.new(args).run
Server.shutdown
diff --git a/lib/rb/lib/thrift/transport/socket.rb b/lib/rb/lib/thrift/transport/socket.rb
index 7cf2f45..3d540e4 100644
--- a/lib/rb/lib/thrift/transport/socket.rb
+++ b/lib/rb/lib/thrift/transport/socket.rb
@@ -14,6 +14,7 @@
def initialize(host='localhost', port=9090)
@host = host
@port = port
+ @desc = "#{host}:#{port}"
@handle = nil
end
@@ -23,7 +24,7 @@
begin
@handle = TCPSocket.new(@host, @port)
rescue StandardError
- raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@host}:#{@port}")
+ raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}")
end
end
@@ -57,7 +58,7 @@
raise TransportException.new(TransportException::NOT_OPEN, e.message)
end
if (data.nil? or data.length == 0)
- raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@host}:#{@port}")
+ raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
end
data
end
diff --git a/lib/rb/lib/thrift/transport/unixsocket.rb b/lib/rb/lib/thrift/transport/unixsocket.rb
new file mode 100644
index 0000000..af686ca
--- /dev/null
+++ b/lib/rb/lib/thrift/transport/unixsocket.rb
@@ -0,0 +1,51 @@
+require 'thrift/transport'
+require 'socket'
+
+module Thrift
+ class UNIXSocket < Socket
+ def initialize(path)
+ @path = path
+ @desc = @path # for read()'s error
+ @handle = nil
+ end
+
+ def open
+ begin
+ @handle = ::UNIXSocket.new(@path)
+ rescue StandardError
+ raise TransportException.new(TransportException::NOT_OPEN, "Could not open UNIX socket at #{@path}")
+ end
+ end
+ end
+
+ class UNIXServerSocket < ServerTransport
+ def initialize(path)
+ @path = path
+ @handle = nil
+ end
+
+ attr_accessor :handle
+
+ def listen
+ @handle = ::UNIXServer.new(@path)
+ end
+
+ def accept
+ unless @handle.nil?
+ sock = @handle.accept
+ trans = UNIXSocket.new(nil)
+ trans.handle = sock
+ trans
+ end
+ end
+
+ def close
+ if @handle
+ @handle.close unless @handle.closed?
+ @handle = nil
+ # UNIXServer doesn't delete the socket file, so we have to do it ourselves
+ File.delete(@path)
+ end
+ end
+ end
+end
diff --git a/lib/rb/spec/socket_spec.rb b/lib/rb/spec/socket_spec.rb
index 4b6dae5..ed2f56a 100644
--- a/lib/rb/spec/socket_spec.rb
+++ b/lib/rb/spec/socket_spec.rb
@@ -1,81 +1,33 @@
require File.dirname(__FILE__) + '/spec_helper'
+require File.dirname(__FILE__) + "/socket_spec_shared"
class ThriftSocketSpec < Spec::ExampleGroup
include Thrift
- before(:each) do
- @socket = Socket.new
- @handle = mock("Handle", :closed? => false)
- @handle.stub!(:close)
- end
-
describe Socket do
- it "should open a TCPSocket" do
- TCPSocket.should_receive(:new).with('localhost', 9090).and_return(@handle)
- @socket.open.should == @handle
+ before(:each) do
+ @socket = Socket.new
+ @handle = mock("Handle", :closed? => false)
+ @handle.stub!(:close)
+ TCPSocket.stub!(:new).and_return(@handle)
+ end
+
+ it_should_behave_like "a socket"
+
+ it "should raise a TransportException when it cannot open a socket" do
+ TCPSocket.should_receive(:new).and_raise(StandardError)
+ lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+ end
+
+ it "should open a TCPSocket with default args" do
+ TCPSocket.should_receive(:new).with('localhost', 9090)
+ @socket.open
end
it "should accept host/port options" do
TCPSocket.should_receive(:new).with('my.domain', 1234)
Socket.new('my.domain', 1234).open
end
-
- it "should raise a TransportException when it cannot open a socket" do
- TCPSocket.should_receive(:new).with('localhost', 9090).and_raise(StandardError)
- lambda { @socket.open }.should raise_error(TransportException, "Could not connect to localhost:9090") { |e| e.type.should == TransportException::NOT_OPEN }
- end
-
- it "should be open whenever it has a handle" do
- @socket.should_not be_open
- TCPSocket.should_receive(:new).and_return(@handle)
- @socket.open
- @socket.should be_open
- @socket.handle = nil
- @socket.should_not be_open
- @socket.handle = @handle
- @handle.should_receive(:close)
- @socket.close
- @socket.should_not be_open
- end
-
- it "should write data to the handle" do
- TCPSocket.should_receive(:new).and_return(@handle)
- @socket.open
- @handle.should_receive(:write).with("foobar")
- @socket.write("foobar")
- @handle.should_receive(:write).with("fail").and_raise(StandardError)
- lambda { @socket.write("fail") }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN }
- end
-
- it "should raise an error when it cannot read from the handle" do
- TCPSocket.should_receive(:new).and_return(@handle)
- @socket.open
- @handle.should_receive(:read).with(17).and_raise(StandardError)
- lambda { @socket.read(17) }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN }
- end
-
- it "should raise an error when it reads no data from the handle" do
- TCPSocket.should_receive(:new).and_return(@handle)
- @socket.open
- @handle.should_receive(:read).with(17).and_return("")
- lambda { @socket.read(17) }.should raise_error(TransportException, "Socket: Could not read 17 bytes from localhost:9090")
- end
-
- it "should return the data read when reading from the handle works" do
- TCPSocket.should_receive(:new).and_return(@handle)
- @socket.open
- @handle.should_receive(:read).with(17).and_return("test data")
- @socket.read(17).should == "test data"
- end
-
- it "should declare itself as closed when it has an error" do
- TCPSocket.should_receive(:new).and_return(@handle)
- @socket.open
- @handle.should_receive(:write).with("fail").and_raise(StandardError)
- @socket.should be_open
- lambda { @socket.write("fail") }.should raise_error
- @socket.should_not be_open
- end
end
describe ServerSocket do
@@ -84,8 +36,8 @@
end
it "should create a handle when calling listen" do
+ TCPServer.should_receive(:new).with(nil, 1234)
@socket.listen
- @socket.handle.should be_an_instance_of(TCPServer)
end
it "should accept an optional host argument" do
diff --git a/lib/rb/spec/socket_spec_shared.rb b/lib/rb/spec/socket_spec_shared.rb
new file mode 100644
index 0000000..448a516
--- /dev/null
+++ b/lib/rb/spec/socket_spec_shared.rb
@@ -0,0 +1,52 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+shared_examples_for "a socket" do
+ it "should open a socket" do
+ @socket.open.should == @handle
+ end
+
+ it "should be open whenever it has a handle" do
+ @socket.should_not be_open
+ @socket.open
+ @socket.should be_open
+ @socket.handle = nil
+ @socket.should_not be_open
+ @socket.handle = @handle
+ @socket.close
+ @socket.should_not be_open
+ end
+
+ it "should write data to the handle" do
+ @socket.open
+ @handle.should_receive(:write).with("foobar")
+ @socket.write("foobar")
+ @handle.should_receive(:write).with("fail").and_raise(StandardError)
+ lambda { @socket.write("fail") }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+ end
+
+ it "should raise an error when it cannot read from the handle" do
+ @socket.open
+ @handle.should_receive(:read).with(17).and_raise(StandardError)
+ lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+ end
+
+ it "should raise an error when it reads no data from the handle" do
+ @socket.open
+ @handle.should_receive(:read).with(17).and_return("")
+ lambda { @socket.read(17) }.should raise_error(Thrift::TransportException, "Socket: Could not read 17 bytes from #{@socket.instance_variable_get("@desc")}")
+ end
+
+ it "should return the data read when reading from the handle works" do
+ @socket.open
+ @handle.should_receive(:read).with(17).and_return("test data")
+ @socket.read(17).should == "test data"
+ end
+
+ it "should declare itself as closed when it has an error" do
+ @socket.open
+ @handle.should_receive(:write).with("fail").and_raise(StandardError)
+ @socket.should be_open
+ lambda { @socket.write("fail") }.should raise_error
+ @socket.should_not be_open
+ end
+end
diff --git a/lib/rb/spec/unixsocket_spec.rb b/lib/rb/spec/unixsocket_spec.rb
new file mode 100644
index 0000000..777ef04
--- /dev/null
+++ b/lib/rb/spec/unixsocket_spec.rb
@@ -0,0 +1,60 @@
+require File.dirname(__FILE__) + '/spec_helper'
+require 'thrift/transport/unixsocket'
+require File.dirname(__FILE__) + "/socket_spec_shared"
+
+class ThriftUNIXSocketSpec < Spec::ExampleGroup
+ include Thrift
+
+ describe UNIXSocket do
+ before(:each) do
+ @path = '/tmp/thrift_spec_socket'
+ @socket = UNIXSocket.new(@path)
+ @handle = mock("Handle", :closed? => false)
+ @handle.stub!(:close)
+ ::UNIXSocket.stub!(:new).and_return(@handle)
+ end
+
+ it_should_behave_like "a socket"
+
+ it "should raise a TransportException when it cannot open a socket" do
+ ::UNIXSocket.should_receive(:new).and_raise(StandardError)
+ lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
+ end
+ end
+
+ describe UNIXServerSocket do
+ before(:each) do
+ @path = '/tmp/thrift_spec_socket'
+ @socket = UNIXServerSocket.new(@path)
+ end
+
+ it "should create a handle when calling listen" do
+ UNIXServer.should_receive(:new).with(@path)
+ @socket.listen
+ end
+
+ it "should create a Thrift::UNIXSocket to wrap accepted sockets" do
+ handle = mock("UNIXServer")
+ UNIXServer.should_receive(:new).with(@path).and_return(handle)
+ @socket.listen
+ sock = mock("sock")
+ handle.should_receive(:accept).and_return(sock)
+ trans = mock("UNIXSocket")
+ UNIXSocket.should_receive(:new).and_return(trans)
+ trans.should_receive(:handle=).with(sock)
+ @socket.accept.should == trans
+ end
+
+ it "should close the handle when closed" do
+ handle = mock("UNIXServer", :closed? => false)
+ UNIXServer.should_receive(:new).with(@path).and_return(handle)
+ @socket.listen
+ handle.should_receive(:close)
+ @socket.close
+ end
+
+ it "should return nil when accepting if there is no handle" do
+ @socket.accept.should be_nil
+ end
+ end
+end