rb: Add optional timeout argument to Thrift::Socket [THRIFT-74]
Socket.new and UNIXSocket.new both now have a new optional argument: timeout.
There's also a timeout field accessor. This timeout is used when reading or
writing.
Author: Kevin Ballard <kevin@rapleaf.com>
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@678053 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/lib/thrift/transport/socket.rb b/lib/rb/lib/thrift/transport/socket.rb
index c7ed521..8f58352 100644
--- a/lib/rb/lib/thrift/transport/socket.rb
+++ b/lib/rb/lib/thrift/transport/socket.rb
@@ -11,14 +11,15 @@
module Thrift
class Socket < Transport
- def initialize(host='localhost', port=9090)
+ def initialize(host='localhost', port=9090, timeout=nil)
@host = host
@port = port
+ @timeout = timeout
@desc = "#{host}:#{port}"
@handle = nil
end
- attr_accessor :handle
+ attr_accessor :handle, :timeout
def open
begin
@@ -35,11 +36,31 @@
def write(str)
raise IOError, "closed stream" unless open?
begin
- @handle.write(str)
- rescue StandardError
+ if @timeout.nil? or @timeout == 0
+ @handle.write(str)
+ else
+ len = 0
+ start = Time.now
+ while Time.now - start < @timeout
+ rd, wr, = IO.select(nil, [@handle], nil, @timeout)
+ if wr and not wr.empty?
+ len += @handle.write_nonblock(str[len..-1])
+ break if len >= str.length
+ end
+ end
+ if len < str.length
+ raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")
+ else
+ len
+ end
+ end
+ rescue TransportException => e
+ # pass this on
+ raise e
+ rescue StandardError => e
@handle.close
@handle = nil
- raise TransportException.new(TransportException::NOT_OPEN)
+ raise TransportException.new(TransportException::NOT_OPEN, e.message)
end
end
@@ -47,7 +68,26 @@
raise IOError, "closed stream" unless open?
begin
- data = @handle.readpartial(sz)
+ if @timeout.nil? or @timeout == 0
+ data = @handle.readpartial(sz)
+ else
+ # it's possible to interrupt select for something other than the timeout
+ # so we need to ensure we've waited long enough
+ start = Time.now
+ rd = nil # scoping
+ loop do
+ rd, = IO.select([@handle], nil, nil, @timeout)
+ break if (rd and not rd.empty?) or Time.now - start >= @timeout
+ end
+ if rd.nil? or rd.empty?
+ raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
+ else
+ data = @handle.readpartial(sz)
+ end
+ end
+ rescue TransportException => e
+ # don't let this get caught by the StandardError handler
+ raise e
rescue StandardError => e
@handle.close unless @handle.closed?
@handle = nil
diff --git a/lib/rb/lib/thrift/transport/unixsocket.rb b/lib/rb/lib/thrift/transport/unixsocket.rb
index b24e7df..2bf95e9 100644
--- a/lib/rb/lib/thrift/transport/unixsocket.rb
+++ b/lib/rb/lib/thrift/transport/unixsocket.rb
@@ -3,8 +3,9 @@
module Thrift
class UNIXSocket < Socket
- def initialize(path)
+ def initialize(path, timeout=nil)
@path = path
+ @timeout = timeout
@desc = @path # for read()'s error
@handle = nil
end
diff --git a/lib/rb/spec/socket_spec.rb b/lib/rb/spec/socket_spec.rb
index 3fede96..c5866ee 100644
--- a/lib/rb/spec/socket_spec.rb
+++ b/lib/rb/spec/socket_spec.rb
@@ -28,6 +28,11 @@
TCPSocket.should_receive(:new).with('my.domain', 1234)
Socket.new('my.domain', 1234).open
end
+
+ it "should accept an optional timeout" do
+ TCPSocket.stub!(:new)
+ Socket.new('localhost', 8080, 5).timeout.should == 5
+ end
end
describe ServerSocket do
diff --git a/lib/rb/spec/socket_spec_shared.rb b/lib/rb/spec/socket_spec_shared.rb
index b32ab44..2d17fd3 100644
--- a/lib/rb/spec/socket_spec_shared.rb
+++ b/lib/rb/spec/socket_spec_shared.rb
@@ -51,4 +51,35 @@
lambda { @socket.write("fail") }.should raise_error(IOError, "closed stream")
lambda { @socket.read(10) }.should raise_error(IOError, "closed stream")
end
+
+ it "should support the timeout accessor for read" do
+ @socket.timeout = 3
+ @socket.open
+ IO.should_receive(:select).with([@handle], nil, nil, 3).and_return([[@handle], [], []])
+ @handle.should_receive(:readpartial).with(17).and_return("test data")
+ @socket.read(17).should == "test data"
+ end
+
+ it "should support the timeout accessor for write" do
+ @socket.timeout = 3
+ @socket.open
+ IO.should_receive(:select).with(nil, [@handle], nil, 3).twice.and_return([[], [@handle], []])
+ @handle.should_receive(:write_nonblock).with("test data").and_return(4)
+ @handle.should_receive(:write_nonblock).with(" data").and_return(5)
+ @socket.write("test data").should == 9
+ end
+
+ it "should raise an error when read times out" do
+ @socket.timeout = 0.5
+ @socket.open
+ IO.should_receive(:select).with([@handle], nil, nil, 0.5).at_least(1).times.and_return(nil)
+ lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::TIMED_OUT }
+ end
+
+ it "should raise an error when write times out" do
+ @socket.timeout = 0.5
+ @socket.open
+ IO.should_receive(:select).with(nil, [@handle], nil, 0.5).any_number_of_times.and_return(nil)
+ lambda { @socket.write("test data") }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::TIMED_OUT }
+ end
end
diff --git a/lib/rb/spec/unixsocket_spec.rb b/lib/rb/spec/unixsocket_spec.rb
index ff5c96c..222a1c7 100644
--- a/lib/rb/spec/unixsocket_spec.rb
+++ b/lib/rb/spec/unixsocket_spec.rb
@@ -20,6 +20,11 @@
::UNIXSocket.should_receive(:new).and_raise(StandardError)
lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN }
end
+
+ it "should accept an optional timeout" do
+ ::UNIXSocket.stub!(:new)
+ UNIXSocket.new(@path, 5).timeout.should == 5
+ end
end
describe UNIXServerSocket do