Thrift: Python TBufferedTransport improvements.
Summary:
The Python version of TBufferedTransport now uses input buffering.
It is also compatible with the fasbinary module.
Reviewed By: mcslee
Test Plan:
test/FastbinaryTest.py
dreiss@dreiss-vmware:~/gp/thrift/test/py$ strace -f ./TestClient.py 2>&1 | grep recv | wc -l
99
# Install new version in other terminal
dreiss@dreiss-vmware:~/gp/thrift/test/py$ strace -f ./TestClient.py 2>&1 | grep recv | wc -l
14
Revert Plan: ok
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665250 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/protocol/fastbinary.c b/lib/py/src/protocol/fastbinary.c
index 8a04836..61ccd8f 100644
--- a/lib/py/src/protocol/fastbinary.c
+++ b/lib/py/src/protocol/fastbinary.c
@@ -608,7 +608,7 @@
// using building functions as this is a rare codepath
newiobuf = PyObject_CallFunction(
- input->refill_callable, "s#i", *output, len, read, NULL);
+ input->refill_callable, "s#i", *output, read, len, NULL);
if (newiobuf == NULL) {
return false;
}
diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py
index 0f5bfdc..60f4233 100644
--- a/lib/py/src/transport/TTransport.py
+++ b/lib/py/src/transport/TTransport.py
@@ -112,13 +112,16 @@
return buffered
-class TBufferedTransport(TTransportBase):
+class TBufferedTransport(TTransportBase,CReadableTransport):
"""Class that wraps another transport and buffers its I/O."""
+ DEFAULT_BUFFER = 4096
+
def __init__(self, trans):
self.__trans = trans
- self.__buf = StringIO()
+ self.__wbuf = StringIO()
+ self.__rbuf = StringIO("")
def isOpen(self):
return self.__trans.isOpen()
@@ -130,15 +133,38 @@
return self.__trans.close()
def read(self, sz):
- return self.__trans.read(sz)
+ ret = self.__rbuf.read(sz)
+ if len(ret) != 0:
+ return ret
+
+ self.__rbuf = StringIO(self.__trans.read(max(sz, self.DEFAULT_BUFFER)))
+ return self.__rbuf.read(sz)
def write(self, buf):
- self.__buf.write(buf)
+ self.__wbuf.write(buf)
def flush(self):
- self.__trans.write(self.__buf.getvalue())
+ self.__trans.write(self.__wbuf.getvalue())
self.__trans.flush()
- self.__buf = StringIO()
+ self.__wbuf = StringIO()
+
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self.__rbuf
+
+ def cstringio_refill(self, partialread, reqlen):
+ retstring = partialread
+ if reqlen < self.DEFAULT_BUFFER:
+ # try to make a read of as much as we can.
+ retstring += self.__trans.read(self.DEFAULT_BUFFER)
+
+ # but make sure we do read reqlen bytes.
+ if len(retstring) < reqlen:
+ retstring += self.__trans.readAll(reqlen - len(retstring))
+
+ self.__rbuf = StringIO(retstring)
+ return self.__rbuf
class TMemoryBuffer(TTransportBase, CReadableTransport):
"""Wraps a cStringIO object as a TTransport.
diff --git a/test/FastbinaryTest.py b/test/FastbinaryTest.py
index 0918002..f6a8699 100755
--- a/test/FastbinaryTest.py
+++ b/test/FastbinaryTest.py
@@ -79,6 +79,10 @@
rs.bigint = 124523452435L
rs.triple = 3.14
+# make sure this splits two buffers in a buffered protocol
+rshuge = RandomStuff()
+rshuge.myintlist=range(10000)
+
my_zero = Srv.Janky_result({"arg":5})
my_nega = Srv.Janky_args({"success":6})
@@ -98,9 +102,22 @@
def checkRead(o):
prot = TBinaryProtocol.TBinaryProtocol(TTransport.TMemoryBuffer())
o.write(prot)
+
+ slow_version_binary = prot.trans.getvalue()
+
prot = TBinaryProtocol.TBinaryProtocolAccelerated(
- TTransport.TMemoryBuffer(
- prot.trans.getvalue()))
+ TTransport.TMemoryBuffer(slow_version_binary))
+ c = o.__class__()
+ c.read(prot)
+ if c != o:
+ print "copy: "
+ pprint(eval(repr(c)))
+ print "orig: "
+ pprint(eval(repr(o)))
+
+ prot = TBinaryProtocol.TBinaryProtocolAccelerated(
+ TTransport.TBufferedTransport(
+ TTransport.TMemoryBuffer(slow_version_binary)))
c = o.__class__()
c.read(prot)
if c != o:
@@ -117,6 +134,8 @@
checkRead(no_set)
checkWrite(rs)
checkRead(rs)
+ checkWrite(rshuge)
+ checkRead(rshuge)
checkWrite(my_zero)
checkRead(my_zero)
checkRead(Backwards({"first_tag2":4, "second_tag1":2}))