Fix Ruby client seqid handling
diff --git a/lib/rb/lib/thrift/client.rb b/lib/rb/lib/thrift/client.rb
index 27de43b..ae0b840 100644
--- a/lib/rb/lib/thrift/client.rb
+++ b/lib/rb/lib/thrift/client.rb
@@ -19,19 +19,25 @@
module Thrift
module Client
+ MIN_SEQUENCE_ID = -(2**31)
+ MAX_SEQUENCE_ID = (2**31) - 1
+
def initialize(iprot, oprot = nil)
@iprot = iprot
@oprot = oprot || iprot
@seqid = 0
+ @pending_seqids = []
end
def send_message(name, args_class, args = {})
- @oprot.write_message_begin(name, MessageTypes::CALL, @seqid)
+ seqid = next_seqid!
+ @oprot.write_message_begin(name, MessageTypes::CALL, seqid)
send_message_args(args_class, args)
+ @pending_seqids << seqid
end
def send_oneway_message(name, args_class, args = {})
- @oprot.write_message_begin(name, MessageTypes::ONEWAY, @seqid)
+ @oprot.write_message_begin(name, MessageTypes::ONEWAY, next_seqid!)
send_message_args(args_class, args)
end
@@ -56,8 +62,37 @@
end
def reply_seqid(rseqid)
- result = (rseqid==@seqid)?true:false
- result
+ expected_seqid = dequeue_pending_seqid
+ !expected_seqid.nil? && rseqid == expected_seqid
+ end
+
+ def validate_message_begin(fname, mtype, rseqid, expected_name)
+ expected_seqid = dequeue_pending_seqid
+
+ if mtype == MessageTypes::EXCEPTION
+ raise_application_exception
+ end
+
+ if mtype != MessageTypes::REPLY
+ raise ApplicationException.new(
+ ApplicationException::INVALID_MESSAGE_TYPE,
+ "#{expected_name} failed: invalid message type"
+ )
+ end
+
+ if fname != expected_name
+ raise ApplicationException.new(
+ ApplicationException::WRONG_METHOD_NAME,
+ "#{expected_name} failed: wrong method name"
+ )
+ end
+
+ return if !expected_seqid.nil? && rseqid == expected_seqid
+
+ raise ApplicationException.new(
+ ApplicationException::BAD_SEQUENCE_ID,
+ "#{expected_name} failed: out of sequence response"
+ )
end
def receive_message(result_klass)
@@ -69,11 +104,28 @@
def handle_exception(mtype)
if mtype == MessageTypes::EXCEPTION
- x = ApplicationException.new
- x.read(@iprot)
- @iprot.read_message_end
- raise x
+ dequeue_pending_seqid
+ raise_application_exception
end
end
+
+ private
+
+ def next_seqid!
+ seqid = @seqid
+ @seqid = (seqid == MAX_SEQUENCE_ID) ? MIN_SEQUENCE_ID : seqid + 1
+ seqid
+ end
+
+ def dequeue_pending_seqid
+ @pending_seqids.shift
+ end
+
+ def raise_application_exception
+ x = ApplicationException.new
+ x.read(@iprot)
+ @iprot.read_message_end
+ raise x
+ end
end
end
diff --git a/lib/rb/lib/thrift/protocol/compact_protocol.rb b/lib/rb/lib/thrift/protocol/compact_protocol.rb
index 56069ef..e926e8f 100644
--- a/lib/rb/lib/thrift/protocol/compact_protocol.rb
+++ b/lib/rb/lib/thrift/protocol/compact_protocol.rb
@@ -110,7 +110,7 @@
def write_message_begin(name, type, seqid)
write_byte(PROTOCOL_ID)
write_byte((VERSION & VERSION_MASK) | ((type << TYPE_SHIFT_AMOUNT) & TYPE_MASK))
- write_varint32(seqid)
+ write_varint32(message_seqid_to_varint32(seqid))
write_string(name)
nil
end
@@ -241,7 +241,7 @@
end
type = (version_and_type >> TYPE_SHIFT_AMOUNT) & TYPE_BITS
- seqid = read_varint32()
+ seqid = message_seqid_from_varint32(read_varint32())
messageName = read_string()
[messageName, type, seqid]
end
@@ -441,6 +441,18 @@
def zig_zag_to_long(n)
(n >> 1) ^ -(n & 1)
end
+
+ def message_seqid_to_varint32(seqid)
+ if seqid < -(2**31) || seqid > (2**31) - 1
+ raise RangeError, "seqid must be a signed int32"
+ end
+
+ seqid < 0 ? seqid + (2**32) : seqid
+ end
+
+ def message_seqid_from_varint32(seqid)
+ seqid > 0x7fffffff ? seqid - (2**32) : seqid
+ end
end
class CompactProtocolFactory < BaseProtocolFactory
diff --git a/lib/rb/lib/thrift/protocol/header_protocol.rb b/lib/rb/lib/thrift/protocol/header_protocol.rb
index 8bf4bbe..1246170 100644
--- a/lib/rb/lib/thrift/protocol/header_protocol.rb
+++ b/lib/rb/lib/thrift/protocol/header_protocol.rb
@@ -82,6 +82,7 @@
# Write methods - delegate to underlying protocol
def write_message_begin(name, type, seqid)
+ @header_transport.sequence_id = seqid
@protocol.write_message_begin(name, type, seqid)
end
diff --git a/lib/rb/lib/thrift/transport/header_transport.rb b/lib/rb/lib/thrift/transport/header_transport.rb
index 54d3a6c..02e142b 100644
--- a/lib/rb/lib/thrift/transport/header_transport.rb
+++ b/lib/rb/lib/thrift/transport/header_transport.rb
@@ -114,6 +114,14 @@
@max_frame_size = MAX_FRAME_SIZE
end
+ def sequence_id=(sequence_id)
+ if sequence_id < -(2**31) || sequence_id > (2**31) - 1
+ raise RangeError, "sequence_id must be a signed int32"
+ end
+
+ @sequence_id = sequence_id
+ end
+
def open?
@transport.open?
end
@@ -307,7 +315,7 @@
# Read flags and sequence ID
@flags = buf.read(2).unpack('n').first
- @sequence_id = buf.read(4).unpack('N').first
+ @sequence_id = signed_int32(buf.read(4).unpack('N').first)
# Read header length (in 32-bit words)
header_words = buf.read(2).unpack('n').first
@@ -412,7 +420,7 @@
frame << [frame_size].pack('N') # Length
frame << [HEADER_MAGIC].pack('n') # Magic
frame << [@flags].pack('n') # Flags
- frame << [@sequence_id].pack('N') # Sequence ID
+ frame << [unsigned_int32(@sequence_id)].pack('N') # Sequence ID
frame << [header_data.bytesize / 4].pack('n') # Header length (in 32-bit words)
frame << header_data # Header data
frame << payload # Payload
@@ -480,6 +488,14 @@
write_varint32(io, value.bytesize)
io.write(value)
end
+
+ def signed_int32(value)
+ value > 0x7fffffff ? value - (2**32) : value
+ end
+
+ def unsigned_int32(value)
+ value < 0 ? value + (2**32) : value
+ end
end
# Factory for creating HeaderTransport instances
diff --git a/lib/rb/spec/client_spec.rb b/lib/rb/spec/client_spec.rb
index 8650350..c963b47 100644
--- a/lib/rb/spec/client_spec.rb
+++ b/lib/rb/spec/client_spec.rb
@@ -24,6 +24,11 @@
include Thrift::Client
end
+ class EmptyArgs
+ def write(_prot)
+ end
+ end
+
before(:each) do
@prot = double("MockProtocol")
@client = ClientSpec.new(@prot)
@@ -52,15 +57,34 @@
end
it "should increment the sequence id when sending messages" do
- pending "it seems sequence ids are completely ignored right now"
- @prot.expect(:write_message_begin).with('testMessage', Thrift::MessageTypes::CALL, 0).ordered
- @prot.expect(:write_message_begin).with('testMessage2', Thrift::MessageTypes::CALL, 1).ordered
- @prot.expect(:write_message_begin).with('testMessage3', Thrift::MessageTypes::CALL, 2).ordered
- @prot.stub!(:write_message_end)
- @prot.stub!(:trans).and_return double("trans").as_null_object
- @client.send_message('testMessage', double("args class").as_null_object)
- @client.send_message('testMessage2', double("args class").as_null_object)
- @client.send_message('testMessage3', double("args class").as_null_object)
+ expect(@prot).to receive(:write_message_begin).with('testMessage', Thrift::MessageTypes::CALL, 0).ordered
+ expect(@prot).to receive(:write_message_begin).with('testMessage2', Thrift::MessageTypes::CALL, 1).ordered
+ expect(@prot).to receive(:write_message_begin).with('testMessage3', Thrift::MessageTypes::CALL, 2).ordered
+ allow(@prot).to receive(:write_message_end)
+ allow(@prot).to receive(:trans).and_return(double("trans", :flush => nil))
+
+ args_class = double("ArgsClass", :new => EmptyArgs.new)
+ @client.send_message('testMessage', args_class)
+ @client.send_message('testMessage2', args_class)
+ @client.send_message('testMessage3', args_class)
+ end
+
+ it "should keep pending reply sequence ids in FIFO order" do
+ expect(@prot).to receive(:write_message_begin).with('first', Thrift::MessageTypes::CALL, 0).ordered
+ expect(@prot).to receive(:write_message_begin).with('second', Thrift::MessageTypes::CALL, 1).ordered
+ allow(@prot).to receive(:write_message_end)
+ allow(@prot).to receive(:trans).and_return(double("trans", :flush => nil))
+
+ args_class = double("ArgsClass", :new => EmptyArgs.new)
+ @client.send_message('first', args_class)
+ @client.send_message('second', args_class)
+
+ expect {
+ @client.validate_message_begin('first', Thrift::MessageTypes::REPLY, 0, 'first')
+ }.not_to raise_error
+ expect {
+ @client.validate_message_begin('second', Thrift::MessageTypes::REPLY, 1, 'second')
+ }.not_to raise_error
end
it "should receive a test message" do
@@ -72,16 +96,59 @@
@client.receive_message(double("MockClass", :new => mock_klass))
end
- it "should handle received exceptions" do
- expect(@prot).to receive(:read_message_begin).and_return [nil, Thrift::MessageTypes::EXCEPTION, 0]
+ it "should raise BAD_SEQUENCE_ID for mismatched replies" do
+ @client.instance_variable_set(:@pending_seqids, [0])
+
+ expect {
+ @client.validate_message_begin('testMessage', Thrift::MessageTypes::REPLY, 1, 'testMessage')
+ }.to raise_error(Thrift::ApplicationException) { |error|
+ expect(error.type).to eq(Thrift::ApplicationException::BAD_SEQUENCE_ID)
+ }
+ end
+
+ it "should raise WRONG_METHOD_NAME for unexpected replies" do
+ @client.instance_variable_set(:@pending_seqids, [0])
+
+ expect {
+ @client.validate_message_begin('otherMessage', Thrift::MessageTypes::REPLY, 0, 'testMessage')
+ }.to raise_error(Thrift::ApplicationException) { |error|
+ expect(error.type).to eq(Thrift::ApplicationException::WRONG_METHOD_NAME)
+ }
+ end
+
+ it "should raise INVALID_MESSAGE_TYPE for non-reply messages" do
+ @client.instance_variable_set(:@pending_seqids, [0])
+
+ expect {
+ @client.validate_message_begin('testMessage', Thrift::MessageTypes::CALL, 0, 'testMessage')
+ }.to raise_error(Thrift::ApplicationException) { |error|
+ expect(error.type).to eq(Thrift::ApplicationException::INVALID_MESSAGE_TYPE)
+ }
+ end
+
+ it "should raise received application exceptions" do
expect(@prot).to receive(:read_message_end)
- expect(Thrift::ApplicationException).to receive(:new) do
- StandardError.new.tap do |mock_exc|
- expect(mock_exc).to receive(:read).with(@prot)
- end
- end
- fname, mtype, sqeid = @client.receive_message_begin()
- expect { @client.handle_exception(mtype) }.to raise_error(StandardError)
+ server_exception = Thrift::ApplicationException.new(Thrift::ApplicationException::UNKNOWN, "boom")
+ expect(server_exception).to receive(:read).with(@prot)
+ expect(Thrift::ApplicationException).to receive(:new).and_return(server_exception)
+ @client.instance_variable_set(:@pending_seqids, [0])
+
+ expect {
+ @client.validate_message_begin('testMessage', Thrift::MessageTypes::EXCEPTION, 0, 'testMessage')
+ }.to raise_error(Thrift::ApplicationException, "boom")
+ expect(@client.instance_variable_get(:@pending_seqids)).to be_empty
+ end
+
+ it "should roll sequence ids across the signed int32 boundary" do
+ expect(@prot).to receive(:write_message_begin).with('testMessage', Thrift::MessageTypes::CALL, Thrift::Client::MAX_SEQUENCE_ID).ordered
+ expect(@prot).to receive(:write_message_begin).with('testMessage2', Thrift::MessageTypes::CALL, Thrift::Client::MIN_SEQUENCE_ID).ordered
+ allow(@prot).to receive(:write_message_end)
+ allow(@prot).to receive(:trans).and_return(double("trans", :flush => nil))
+
+ @client.instance_variable_set(:@seqid, Thrift::Client::MAX_SEQUENCE_ID)
+ args_class = double("ArgsClass", :new => EmptyArgs.new)
+ @client.send_message('testMessage', args_class)
+ @client.send_message('testMessage2', args_class)
end
it "should close the transport if an error occurs while sending a message" do
diff --git a/lib/rb/spec/compact_protocol_spec.rb b/lib/rb/spec/compact_protocol_spec.rb
index 944554f..e67514b 100644
--- a/lib/rb/spec/compact_protocol_spec.rb
+++ b/lib/rb/spec/compact_protocol_spec.rb
@@ -124,6 +124,20 @@
expect(client.recv_Janky).to eq(2)
end
+ it "should round-trip wrapped negative seqids in message headers" do
+ trans = Thrift::MemoryBufferTransport.new
+ writer = Thrift::CompactProtocol.new(trans)
+
+ writer.write_message_begin("test", Thrift::MessageTypes::CALL, -2147483648)
+ writer.write_message_end
+
+ reader = Thrift::CompactProtocol.new(trans)
+ name, type, seqid = reader.read_message_begin
+ expect(name).to eq("test")
+ expect(type).to eq(Thrift::MessageTypes::CALL)
+ expect(seqid).to eq(-2147483648)
+ end
+
it "should deal with fields following fields that have non-delta ids" do
brcp = Thrift::Test::BreaksRubyCompactProtocol.new(
:field1 => "blah",
diff --git a/lib/rb/spec/header_protocol_spec.rb b/lib/rb/spec/header_protocol_spec.rb
index 3feb9b6..9edac1d 100644
--- a/lib/rb/spec/header_protocol_spec.rb
+++ b/lib/rb/spec/header_protocol_spec.rb
@@ -102,6 +102,15 @@
expect(seqid).to eq(123)
end
+ it "should propagate seqid to the outer header frame" do
+ @protocol.write_message_begin("test_method", Thrift::MessageTypes::CALL, 123)
+ @protocol.write_message_end
+ @protocol.trans.flush
+
+ data = @buffer.read(@buffer.available)
+ expect(data[8, 4].unpack('N').first).to eq(123)
+ end
+
it "should write and read structs" do
@protocol.write_message_begin("test", Thrift::MessageTypes::CALL, 1)
@protocol.write_struct_begin("TestStruct")
diff --git a/lib/rb/spec/header_transport_spec.rb b/lib/rb/spec/header_transport_spec.rb
index 2857e15..4d073fa 100644
--- a/lib/rb/spec/header_transport_spec.rb
+++ b/lib/rb/spec/header_transport_spec.rb
@@ -129,6 +129,15 @@
expect(data.bytesize).to be > 30 # Should include header key-value
end
+ it "should write the configured sequence id into the frame header" do
+ @trans.sequence_id = 456
+ @trans.write("payload")
+ @trans.flush
+
+ data = @underlying.read(@underlying.available)
+ expect(data[8, 4].unpack('N').first).to eq(456)
+ end
+
it "should apply ZLIB transform" do
@trans.add_transform(Thrift::HeaderTransformID::ZLIB)
original_payload = "a" * 1000 # Compressible data
@@ -205,6 +214,19 @@
expect(headers["request-id"]).to eq("12345")
end
+ it "should decode signed sequence ids from Header frames" do
+ @trans.sequence_id = -2147483648
+ @trans.write("payload")
+ @trans.flush
+
+ written_data = @underlying.read(@underlying.available)
+ read_transport = Thrift::MemoryBufferTransport.new(written_data)
+ read_trans = Thrift::HeaderTransport.new(read_transport)
+
+ expect(read_trans.read(7)).to eq("payload")
+ expect(read_trans.sequence_id).to eq(-2147483648)
+ end
+
it "should decompress ZLIB payload" do
# Write with ZLIB
@trans.add_transform(Thrift::HeaderTransformID::ZLIB)