Use read_nonblock instead of readpartial to account for SSL socket buffer
diff --git a/lib/rb/benchmark/client.rb b/lib/rb/benchmark/client.rb
index 304e6d8..2b892c8 100644
--- a/lib/rb/benchmark/client.rb
+++ b/lib/rb/benchmark/client.rb
@@ -72,9 +72,9 @@
ctx.key = OpenSSL::PKey::RSA.new(File.open(File.join(keys_dir, "client.key")))
end
- Thrift::SSLSocket.new(@host, @port, nil, ssl_context)
+ Thrift::SSLSocket.new(@host, @port, 5, ssl_context)
else
- Thrift::Socket.new(@host, @port)
+ Thrift::Socket.new(@host, @port, 5)
end
protocol = create_protocol(socket)
transport = protocol.trans
diff --git a/lib/rb/lib/thrift/transport/socket.rb b/lib/rb/lib/thrift/transport/socket.rb
index 7d01e2e..0f58ded 100644
--- a/lib/rb/lib/thrift/transport/socket.rb
+++ b/lib/rb/lib/thrift/transport/socket.rb
@@ -68,20 +68,20 @@
if @timeout.nil? or @timeout == 0
@handle.write(str)
else
+ deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
len = 0
- start = Time.now
- while Time.now - start < @timeout
- rd, wr, = IO.select(nil, [@handle], nil, @timeout)
- if wr and not wr.empty?
+
+ while len < str.length
+ begin
len += @handle.write_nonblock(str[len..-1])
- break if len >= str.length
+ rescue IO::WaitWritable
+ wait_for(:write, deadline, str.length)
+ rescue IO::WaitReadable
+ wait_for(:read, deadline, str.length)
end
end
- if len < str.length
- raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")
- else
- len
- end
+
+ len
end
rescue TransportException => e
# pass this on
@@ -100,19 +100,16 @@
if @timeout.nil? or @timeout == 0
data = @handle.readpartial(sz)
else
- # it's possible to interrupt select for something other than the timeout
- # so we need to ensure we've waited long enough, but not too long
- start = Time.now
- timespent = 0
- rd = loop do
- rd, = IO.select([@handle], nil, nil, @timeout - timespent)
- timespent = Time.now - start
- break rd if (rd and not rd.empty?) or timespent >= @timeout
- end
- if rd.nil? or rd.empty?
- raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
- else
- data = @handle.readpartial(sz)
+ deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
+
+ data = loop do
+ begin
+ break @handle.read_nonblock(sz)
+ rescue IO::WaitReadable
+ wait_for(:read, deadline, sz)
+ rescue IO::WaitWritable
+ wait_for(:write, deadline, sz)
+ end
end
end
rescue TransportException => e
@@ -141,5 +138,33 @@
def to_s
"socket(#{@host}:#{@port})"
end
+
+ private
+
+ def wait_for(operation, deadline, sz)
+ rd_ary, wr_ary = case operation
+ when :read
+ [[@handle], nil]
+ when :write
+ [nil, [@handle]]
+ else
+ raise ArgumentError, "Unknown IO wait operation: #{operation.inspect}"
+ end
+
+ loop do
+ remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ if remaining <= 0
+ case operation
+ when :read
+ raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
+ when :write
+ raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{sz} bytes to #{@desc}")
+ end
+ end
+
+ rd, wr, = IO.select(rd_ary, wr_ary, nil, remaining)
+ return if (rd && !rd.empty?) || (wr && !wr.empty?)
+ end
+ end
end
end
diff --git a/lib/rb/spec/socket_spec_shared.rb b/lib/rb/spec/socket_spec_shared.rb
index 2015ac0..163d0c2 100644
--- a/lib/rb/spec/socket_spec_shared.rb
+++ b/lib/rb/spec/socket_spec_shared.rb
@@ -74,31 +74,94 @@
it "should support the timeout accessor for read" do
@socket.timeout = 3
@socket.open
- expect(IO).to receive(:select).with([@handle], nil, nil, 3).and_return([[@handle], [], []])
- expect(@handle).to receive(:readpartial).with(17).and_return("test data")
+ expect(@handle).to receive(:read_nonblock).with(17).and_raise(IO::EAGAINWaitReadable)
+ expect(IO).to receive(:select) do |rd, wr, err, timeout|
+ expect(rd).to eq([@handle])
+ expect(wr).to be_nil
+ expect(err).to be_nil
+ expect(timeout).to be > 0
+ expect(timeout).to be <= 3
+ [[@handle], [], []]
+ end
+ expect(@handle).to receive(:read_nonblock).with(17).and_return("test data")
expect(@socket.read(17)).to eq("test data")
end
it "should support the timeout accessor for write" do
@socket.timeout = 3
@socket.open
- expect(IO).to receive(:select).with(nil, [@handle], nil, 3).twice.and_return([[], [@handle], []])
- expect(@handle).to receive(:write_nonblock).with("test data").and_return(4)
- expect(@handle).to receive(:write_nonblock).with(" data").and_return(5)
+ write_calls = 0
+ expect(@handle).to receive(:write_nonblock).exactly(3).times do |chunk|
+ write_calls += 1
+ case write_calls
+ when 1
+ expect(chunk).to eq("test data")
+ raise IO::EAGAINWaitWritable
+ when 2
+ expect(chunk).to eq("test data")
+ 4
+ when 3
+ expect(chunk).to eq(" data")
+ 5
+ end
+ end
+ expect(IO).to receive(:select) do |rd, wr, err, timeout|
+ expect(rd).to be_nil
+ expect(wr).to eq([@handle])
+ expect(err).to be_nil
+ expect(timeout).to be > 0
+ expect(timeout).to be <= 3
+ [[], [@handle], []]
+ end
expect(@socket.write("test data")).to eq(9)
end
it "should raise an error when read times out" do
@socket.timeout = 0.5
@socket.open
- expect(IO).to receive(:select).once {sleep(0.5); nil}
+ expect(@handle).to receive(:read_nonblock).with(17).and_raise(IO::EAGAINWaitReadable)
+ expect(IO).to receive(:select).once { sleep(0.6); nil }
expect { @socket.read(17) }.to raise_error(Thrift::TransportException) { |e| expect(e.type).to eq(Thrift::TransportException::TIMED_OUT) }
end
it "should raise an error when write times out" do
@socket.timeout = 0.5
@socket.open
- allow(IO).to receive(:select).with(nil, [@handle], nil, 0.5).and_return(nil)
+ expect(@handle).to receive(:write_nonblock).with("test data").and_raise(IO::EAGAINWaitWritable)
+ expect(IO).to receive(:select).once { sleep(0.6); nil }
expect { @socket.write("test data") }.to raise_error(Thrift::TransportException) { |e| expect(e.type).to eq(Thrift::TransportException::TIMED_OUT) }
end
+
+ it "should read buffered SSL data without waiting on the raw socket again" do
+ @socket.timeout = 1
+ @socket.open
+
+ expect(@handle).to receive(:read_nonblock).with(4).ordered.and_raise(IO::EAGAINWaitReadable)
+ expect(IO).to receive(:select).once.ordered do |rd, wr, err, timeout|
+ expect(rd).to eq([@handle])
+ expect(wr).to be_nil
+ expect(err).to be_nil
+ expect(timeout).to be > 0
+ expect(timeout).to be <= 1
+ [[@handle], [], []]
+ end
+ expect(@handle).to receive(:read_nonblock).with(4).ordered.and_return("ABCD")
+ expect(@handle).to receive(:read_nonblock).with(5).ordered.and_return("12345")
+
+ expect(@socket.read(4)).to eq("ABCD")
+ expect(@socket.read(5)).to eq("12345")
+ end
+
+ it "should read without timeout using the blocking path" do
+ @socket.timeout = nil
+ @socket.open
+
+ expect(IO).not_to receive(:select)
+ expect(@handle).not_to receive(:read_nonblock)
+ expect(@handle).to receive(:readpartial).with(4).ordered.and_return("ABCD")
+ expect(@handle).to receive(:readpartial).with(5).ordered.and_return("12345")
+
+ expect(@socket.read(4)).to eq("ABCD")
+ expect(@socket.read(5)).to eq("12345")
+ end
end