Merge Ruby framed transport

Summary: Submitted by Jake Luciani

Reviewed By: mcslee

Test Plan: Test code included in this commit


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665385 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/lib/thrift/thrift.rb b/lib/rb/lib/thrift/thrift.rb
index 4a25959..13b7db6 100644
--- a/lib/rb/lib/thrift/thrift.rb
+++ b/lib/rb/lib/thrift/thrift.rb
@@ -52,14 +52,14 @@
       return
     end
   end
-  
+
   def read_args(iprot, args_class)
     args = args_class.new
     args.read(iprot)
     iprot.readMessageEnd
     args
   end
-  
+
   def write_result(result, oprot, name, seqid)
     oprot.writeMessageBegin(name, TMessageType::REPLY, seqid)
     result.write(oprot)
@@ -85,7 +85,7 @@
   WRONG_METHOD_NAME = 3
   BAD_SEQUENCE_ID = 4
   MISSING_RESULT = 5
-  
+
   attr_reader :type
 
   def initialize(type=UNKNOWN, message=nil)
@@ -155,7 +155,7 @@
     @oprot.writeMessageEnd()
     @oprot.trans.flush()
   end
-  
+
   def receive_message(result_klass)
     fname, mtype, rseqid = @iprot.readMessageBegin()
     handle_exception(mtype)
@@ -164,7 +164,7 @@
     @iprot.readMessageEnd()
     return result
   end
-  
+
   def handle_exception(mtype)
     if mtype == TMessageType::EXCEPTION
       x = TApplicationException.new()
@@ -181,17 +181,17 @@
       instance_variable_set("@#{name}", d[name.to_s])
     end
   end
-  
-  def fields
+
+  def struct_fields
     self.class.const_get(:FIELDS)
   end
-  
+
   def each_field
-    fields.each do |fid, data|
+    struct_fields.each do |fid, data|
       yield fid, data[:type], data[:name]
     end
   end
-  
+
   def read(iprot)
     iprot.readStructBegin()
     loop do
@@ -202,14 +202,14 @@
     end
     iprot.readStructEnd()
   end
-  
+
   def write(oprot)
     oprot.writeStructBegin(self.class.name)
     each_field do |fid, type, name|
       if ((value = instance_variable_get("@#{name}")) != nil)
         if is_container? type
           oprot.writeFieldBegin(name, type, fid)
-          write_container(oprot, value, fields[fid])
+          write_container(oprot, value, struct_fields[fid])
           oprot.writeFieldEnd
         else
           oprot.write_field(name, type, fid, value)
@@ -218,12 +218,12 @@
     end
     oprot.writeFieldStop()
     oprot.writeStructEnd()
-  end  
-  
+  end
+
   protected
-  
+
   def handle_message(iprot, fid, ftype)
-    field = fields[fid]
+    field = struct_fields[fid]
     if field && field[:type] == ftype
       value = read_field(iprot, field)
       instance_variable_set("@#{field[:name]}", value)
@@ -231,7 +231,7 @@
       iprot.skip(ftype)
     end
   end
-  
+
   def read_field(iprot, field = {})
     if field[:type] == TType::STRUCT
       value = field[:class].new
@@ -264,7 +264,7 @@
     end
     value
   end
-  
+
   def write_data(oprot, value, field)
     if is_container? field[:type]
       write_container(oprot, value, field)
@@ -272,7 +272,7 @@
       oprot.write_type(field[:type], value)
     end
   end
-  
+
   def write_container(oprot, value, field = {})
     if field[:type] == TType::MAP
       oprot.writeMapBegin(field[:key][:type], field[:value][:type], value.size)
@@ -297,13 +297,13 @@
       raise "Not a container type: #{field[:type]}"
     end
   end
-  
+
   def is_container?(type)
     [TType::LIST, TType::MAP, TType::SET].include? type
   end
-  
+
   def field_info(field)
-    { :type => field[:type], 
+    { :type => field[:type],
       :class => field[:class],
       :key => field[:key],
       :value => field[:value],
diff --git a/lib/rb/lib/thrift/transport/tsocket.rb b/lib/rb/lib/thrift/transport/tsocket.rb
index 9de2b8e..7f4eed4 100644
--- a/lib/rb/lib/thrift/transport/tsocket.rb
+++ b/lib/rb/lib/thrift/transport/tsocket.rb
@@ -33,7 +33,7 @@
   def isOpen()
     return !@handle.nil?
   end
-  
+
   def write(str)
     begin
       @handle.write(str)
@@ -58,7 +58,7 @@
     @handle.close() unless @handle.nil?
     @handle = nil
   end
-    
+
 end
 
 class TServerSocket < TServerTransport
@@ -80,7 +80,7 @@
     end
     return nil
   end
-   
+
   def close()
    @handle.close() unless @handle.nil?
   end
diff --git a/lib/rb/lib/thrift/transport/ttransport.rb b/lib/rb/lib/thrift/transport/ttransport.rb
index 5b9b8b1..47403fc 100644
--- a/lib/rb/lib/thrift/transport/ttransport.rb
+++ b/lib/rb/lib/thrift/transport/ttransport.rb
@@ -30,13 +30,13 @@
 
 class TTransport
   def isOpen(); nil; end
-  
+
   def open(); nil; end
-  
+
   def close(); nil; end
-  
+
   def read(sz); nil; end
-  
+
   def readAll(sz)
     buff = ''
     have = 0
@@ -51,12 +51,12 @@
   def write(buf); nil; end
 
   def flush(); nil; end
-  
+
 end
 
 class TServerTransport
   def listen(); nil; end
-  
+
   def accept(); nil; end
 
   def close(); nil; end
@@ -68,13 +68,13 @@
     return trans
   end
 end
-  
+
 class TBufferedTransport < TTransport
   def initialize(transport)
     @transport = transport
     @wbuf = ''
   end
-  
+
   def isOpen()
     return @transport.isOpen()
   end
@@ -86,11 +86,11 @@
   def close()
     @transport.close()
   end
-  
+
   def read(sz)
     return @transport.read(sz)
   end
-  
+
   def write(buf)
     @wbuf += buf
   end
@@ -107,3 +107,163 @@
     return TBufferedTransport.new(transport)
   end
 end
+
+
+class TFramedTransport < TTransport
+  def initialize(transport, read=true, write=true)
+    @transport = transport
+    @rbuf      = ''
+    @wbuf      = ''
+    @read      = read
+    @write     = write
+  end
+
+  def isOpen()
+    return @transport.isOpen
+  end
+
+  def open()
+    @transport.open
+  end
+
+  def close()
+    @transport.close
+  end
+
+  def read(sz)
+    if !@read
+      return @transport.read(sz)
+    end
+
+    if (sz <= 0)
+      return ''
+    end
+
+    if (@rbuf.length == 0)
+      self.readFrame
+    end
+
+    # return full buf
+    if (sz > @rbuf.length)
+      out = @rbuf
+      @rbuf = ''
+      return out
+    end
+
+    # Return substr
+    out = @rbuf.slice(0, sz)
+    @rbuf = @rbuf.slice(sz, @rbuf.length)
+    return out
+
+  end
+
+  def write(buf,sz=nil)
+
+    if !@write
+      return @transport.write(buf);
+    end
+
+    if (sz != nil && sz < buf.length)
+      buf = buf.slice(0,sz)
+    end
+
+    @wbuf += buf
+
+  end
+
+  #
+  # Writes the output buffer to the stream in the format of a 4-byte length
+  # followed by the actual data.
+  #
+  def flush
+    if !@write
+      return @transport.flush
+    end
+
+    out = [@wbuf.length].pack('N')
+    out += @wbuf
+    @transport.write(out)
+    @transport.flush
+    @wbuf = ''
+  end
+
+  private
+
+  def readFrame
+    buf  = @transport.readAll(4)
+    val  = buf.unpack('N')
+    sz   = val[0]
+
+    @rbuf = @transport.readAll(sz)
+  end
+
+end
+
+
+class TFramedTransportFactory < TTransportFactory
+  def getTransport(transport)
+    return TFramedTransport.new(transport)
+  end
+end
+
+class TMemoryBuffer < TTransport
+  def initialize(sz=1024)
+    @buf = ''
+    @sz  = sz
+    wpos = 0
+    rpos = 0
+  end
+
+  def isOpen
+    return 1
+  end
+
+  def open
+  end
+
+  def close
+  end
+
+  def peek
+    return rpos < wpos
+  end
+
+  def getBuffer
+    return @buf
+  end
+
+  def resetBuffer(new_buf = '')
+     @buf  = new_buf
+     @sz   = new_buf.length
+     @wpos = new_buf.length
+     @rpos = 0
+  end
+
+  def available
+    return @wpos - @rpos
+  end
+
+  def read(len)
+    avail = available()
+
+    return '' if avail == 0
+
+    #how much to give
+    give = len
+    give = avail if avail < len
+
+    ret = @buf.slice(@rpos,give)
+
+    @rpos += give;
+
+    return ret;
+  end
+
+  def write(wbuf)
+    @buf  += wbuf
+    @wpos += wbuf.length()
+  end
+
+  def flush
+  end
+end
diff --git a/test/rb/TestClient-nb.rb b/test/rb/TestClient-nb.rb
new file mode 100644
index 0000000..a951a9e
--- /dev/null
+++ b/test/rb/TestClient-nb.rb
@@ -0,0 +1,35 @@
+#!/usr/bin/env ruby
+
+$:.push('gen-rb')
+$:.push('../../lib/rb/lib')
+
+require 'thrift/transport/tsocket'
+require 'thrift/protocol/tbinaryprotocol'
+require 'ThriftTest'
+
+t = TFramedTransport.new(TSocket.new('localhost', 9090))
+p = TBinaryProtocol.new(t)
+c = Thrift::Test::ThriftTest::Client.new(p)
+
+t.open()
+
+puts c.testString('string')
+puts c.testByte(8)
+puts c.testByte(-8)
+puts c.testI32(32)
+puts c.testI32(-32)
+puts c.testI64(64)
+puts c.testI64(-64)
+puts c.testDouble(3.14)
+puts c.testDouble(-3.14)
+puts c.testMap({1 => 1, 2 => 2, 3 => 3})
+puts c.testList([1,2,3,4,5])
+puts c.testSet({1 => true, 2 => true, 3 => true})
+struct = Thrift::Test::Xtruct.new({'string_thing' => 'hi!', 'i32_thing' => 4 })
+puts c.testStruct(struct)
+puts c.testNest(Thrift::Test::Xtruct2.new({'struct_thing' => struct, 'i32_thing' => 10}))
+insane = Thrift::Test::Insanity.new({'userMap' => { Thrift::Test::Numberz::ONE => 44 }, 'xtructs' => [struct, Thrift::Test::Xtruct.new({'string_thing' => 'hi again', 'i32_thing' => 12})]})
+puts c.testInsanity(insane)
+puts c.testMapMap(4).inspect
+
+t.close()