Merge Ruby framed transport
Summary: Submitted by Jake Luciani
Reviewed By: mcslee
Test Plan: Test code included in this commit
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665385 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/lib/thrift/thrift.rb b/lib/rb/lib/thrift/thrift.rb
index 4a25959..13b7db6 100644
--- a/lib/rb/lib/thrift/thrift.rb
+++ b/lib/rb/lib/thrift/thrift.rb
@@ -52,14 +52,14 @@
return
end
end
-
+
def read_args(iprot, args_class)
args = args_class.new
args.read(iprot)
iprot.readMessageEnd
args
end
-
+
def write_result(result, oprot, name, seqid)
oprot.writeMessageBegin(name, TMessageType::REPLY, seqid)
result.write(oprot)
@@ -85,7 +85,7 @@
WRONG_METHOD_NAME = 3
BAD_SEQUENCE_ID = 4
MISSING_RESULT = 5
-
+
attr_reader :type
def initialize(type=UNKNOWN, message=nil)
@@ -155,7 +155,7 @@
@oprot.writeMessageEnd()
@oprot.trans.flush()
end
-
+
def receive_message(result_klass)
fname, mtype, rseqid = @iprot.readMessageBegin()
handle_exception(mtype)
@@ -164,7 +164,7 @@
@iprot.readMessageEnd()
return result
end
-
+
def handle_exception(mtype)
if mtype == TMessageType::EXCEPTION
x = TApplicationException.new()
@@ -181,17 +181,17 @@
instance_variable_set("@#{name}", d[name.to_s])
end
end
-
- def fields
+
+ def struct_fields
self.class.const_get(:FIELDS)
end
-
+
def each_field
- fields.each do |fid, data|
+ struct_fields.each do |fid, data|
yield fid, data[:type], data[:name]
end
end
-
+
def read(iprot)
iprot.readStructBegin()
loop do
@@ -202,14 +202,14 @@
end
iprot.readStructEnd()
end
-
+
def write(oprot)
oprot.writeStructBegin(self.class.name)
each_field do |fid, type, name|
if ((value = instance_variable_get("@#{name}")) != nil)
if is_container? type
oprot.writeFieldBegin(name, type, fid)
- write_container(oprot, value, fields[fid])
+ write_container(oprot, value, struct_fields[fid])
oprot.writeFieldEnd
else
oprot.write_field(name, type, fid, value)
@@ -218,12 +218,12 @@
end
oprot.writeFieldStop()
oprot.writeStructEnd()
- end
-
+ end
+
protected
-
+
def handle_message(iprot, fid, ftype)
- field = fields[fid]
+ field = struct_fields[fid]
if field && field[:type] == ftype
value = read_field(iprot, field)
instance_variable_set("@#{field[:name]}", value)
@@ -231,7 +231,7 @@
iprot.skip(ftype)
end
end
-
+
def read_field(iprot, field = {})
if field[:type] == TType::STRUCT
value = field[:class].new
@@ -264,7 +264,7 @@
end
value
end
-
+
def write_data(oprot, value, field)
if is_container? field[:type]
write_container(oprot, value, field)
@@ -272,7 +272,7 @@
oprot.write_type(field[:type], value)
end
end
-
+
def write_container(oprot, value, field = {})
if field[:type] == TType::MAP
oprot.writeMapBegin(field[:key][:type], field[:value][:type], value.size)
@@ -297,13 +297,13 @@
raise "Not a container type: #{field[:type]}"
end
end
-
+
def is_container?(type)
[TType::LIST, TType::MAP, TType::SET].include? type
end
-
+
def field_info(field)
- { :type => field[:type],
+ { :type => field[:type],
:class => field[:class],
:key => field[:key],
:value => field[:value],
diff --git a/lib/rb/lib/thrift/transport/tsocket.rb b/lib/rb/lib/thrift/transport/tsocket.rb
index 9de2b8e..7f4eed4 100644
--- a/lib/rb/lib/thrift/transport/tsocket.rb
+++ b/lib/rb/lib/thrift/transport/tsocket.rb
@@ -33,7 +33,7 @@
def isOpen()
return !@handle.nil?
end
-
+
def write(str)
begin
@handle.write(str)
@@ -58,7 +58,7 @@
@handle.close() unless @handle.nil?
@handle = nil
end
-
+
end
class TServerSocket < TServerTransport
@@ -80,7 +80,7 @@
end
return nil
end
-
+
def close()
@handle.close() unless @handle.nil?
end
diff --git a/lib/rb/lib/thrift/transport/ttransport.rb b/lib/rb/lib/thrift/transport/ttransport.rb
index 5b9b8b1..47403fc 100644
--- a/lib/rb/lib/thrift/transport/ttransport.rb
+++ b/lib/rb/lib/thrift/transport/ttransport.rb
@@ -30,13 +30,13 @@
class TTransport
def isOpen(); nil; end
-
+
def open(); nil; end
-
+
def close(); nil; end
-
+
def read(sz); nil; end
-
+
def readAll(sz)
buff = ''
have = 0
@@ -51,12 +51,12 @@
def write(buf); nil; end
def flush(); nil; end
-
+
end
class TServerTransport
def listen(); nil; end
-
+
def accept(); nil; end
def close(); nil; end
@@ -68,13 +68,13 @@
return trans
end
end
-
+
class TBufferedTransport < TTransport
def initialize(transport)
@transport = transport
@wbuf = ''
end
-
+
def isOpen()
return @transport.isOpen()
end
@@ -86,11 +86,11 @@
def close()
@transport.close()
end
-
+
def read(sz)
return @transport.read(sz)
end
-
+
def write(buf)
@wbuf += buf
end
@@ -107,3 +107,163 @@
return TBufferedTransport.new(transport)
end
end
+
+
+class TFramedTransport < TTransport
+ def initialize(transport, read=true, write=true)
+ @transport = transport
+ @rbuf = ''
+ @wbuf = ''
+ @read = read
+ @write = write
+ end
+
+ def isOpen()
+ return @transport.isOpen
+ end
+
+ def open()
+ @transport.open
+ end
+
+ def close()
+ @transport.close
+ end
+
+ def read(sz)
+ if !@read
+ return @transport.read(sz)
+ end
+
+ if (sz <= 0)
+ return ''
+ end
+
+ if (@rbuf.length == 0)
+ self.readFrame
+ end
+
+ # return full buf
+ if (sz > @rbuf.length)
+ out = @rbuf
+ @rbuf = ''
+ return out
+ end
+
+ # Return substr
+ out = @rbuf.slice(0, sz)
+ @rbuf = @rbuf.slice(sz, @rbuf.length)
+ return out
+
+ end
+
+ def write(buf,sz=nil)
+
+ if !@write
+ return @transport.write(buf);
+ end
+
+ if (sz != nil && sz < buf.length)
+ buf = buf.slice(0,sz)
+ end
+
+ @wbuf += buf
+
+ end
+
+ #
+ # Writes the output buffer to the stream in the format of a 4-byte length
+ # followed by the actual data.
+ #
+ def flush
+ if !@write
+ return @transport.flush
+ end
+
+ out = [@wbuf.length].pack('N')
+ out += @wbuf
+ @transport.write(out)
+ @transport.flush
+ @wbuf = ''
+ end
+
+ private
+
+ def readFrame
+ buf = @transport.readAll(4)
+ val = buf.unpack('N')
+ sz = val[0]
+
+ @rbuf = @transport.readAll(sz)
+ end
+
+end
+
+
+class TFramedTransportFactory < TTransportFactory
+ def getTransport(transport)
+ return TFramedTransport.new(transport)
+ end
+end
+
+class TMemoryBuffer < TTransport
+ def initialize(sz=1024)
+ @buf = ''
+ @sz = sz
+ wpos = 0
+ rpos = 0
+ end
+
+ def isOpen
+ return 1
+ end
+
+ def open
+ end
+
+ def close
+ end
+
+ def peek
+ return rpos < wpos
+ end
+
+ def getBuffer
+ return @buf
+ end
+
+ def resetBuffer(new_buf = '')
+ @buf = new_buf
+ @sz = new_buf.length
+ @wpos = new_buf.length
+ @rpos = 0
+ end
+
+ def available
+ return @wpos - @rpos
+ end
+
+ def read(len)
+ avail = available()
+
+ return '' if avail == 0
+
+ #how much to give
+ give = len
+ give = avail if avail < len
+
+ ret = @buf.slice(@rpos,give)
+
+ @rpos += give;
+
+ return ret;
+ end
+
+ def write(wbuf)
+ @buf += wbuf
+ @wpos += wbuf.length()
+ end
+
+ def flush
+ end
+end
diff --git a/test/rb/TestClient-nb.rb b/test/rb/TestClient-nb.rb
new file mode 100644
index 0000000..a951a9e
--- /dev/null
+++ b/test/rb/TestClient-nb.rb
@@ -0,0 +1,35 @@
+#!/usr/bin/env ruby
+
+$:.push('gen-rb')
+$:.push('../../lib/rb/lib')
+
+require 'thrift/transport/tsocket'
+require 'thrift/protocol/tbinaryprotocol'
+require 'ThriftTest'
+
+t = TFramedTransport.new(TSocket.new('localhost', 9090))
+p = TBinaryProtocol.new(t)
+c = Thrift::Test::ThriftTest::Client.new(p)
+
+t.open()
+
+puts c.testString('string')
+puts c.testByte(8)
+puts c.testByte(-8)
+puts c.testI32(32)
+puts c.testI32(-32)
+puts c.testI64(64)
+puts c.testI64(-64)
+puts c.testDouble(3.14)
+puts c.testDouble(-3.14)
+puts c.testMap({1 => 1, 2 => 2, 3 => 3})
+puts c.testList([1,2,3,4,5])
+puts c.testSet({1 => true, 2 => true, 3 => true})
+struct = Thrift::Test::Xtruct.new({'string_thing' => 'hi!', 'i32_thing' => 4 })
+puts c.testStruct(struct)
+puts c.testNest(Thrift::Test::Xtruct2.new({'struct_thing' => struct, 'i32_thing' => 10}))
+insane = Thrift::Test::Insanity.new({'userMap' => { Thrift::Test::Numberz::ONE => 44 }, 'xtructs' => [struct, Thrift::Test::Xtruct.new({'string_thing' => 'hi again', 'i32_thing' => 12})]})
+puts c.testInsanity(insane)
+puts c.testMapMap(4).inspect
+
+t.close()