|  | #!/usr/bin/env python | 
|  |  | 
|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements. See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership. The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License. You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied. See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  |  | 
|  | from ThriftTest.ttypes import Xtruct | 
|  | from thrift.transport import TTransport | 
|  | from thrift.protocol import TBinaryProtocol | 
|  | from thrift.protocol import TCompactProtocol | 
|  | import unittest | 
|  |  | 
|  |  | 
|  | class TestEof(unittest.TestCase): | 
|  |  | 
|  | def make_data(self, pfactory=None): | 
|  | trans = TTransport.TMemoryBuffer() | 
|  | if pfactory: | 
|  | prot = pfactory.getProtocol(trans) | 
|  | else: | 
|  | prot = TBinaryProtocol.TBinaryProtocol(trans) | 
|  |  | 
|  | x = Xtruct() | 
|  | x.string_thing = "Zero" | 
|  | x.byte_thing = 0 | 
|  |  | 
|  | x.write(prot) | 
|  |  | 
|  | x = Xtruct() | 
|  | x.string_thing = "One" | 
|  | x.byte_thing = 1 | 
|  |  | 
|  | x.write(prot) | 
|  |  | 
|  | return trans.getvalue() | 
|  |  | 
|  | def testTransportReadAll(self): | 
|  | """Test that readAll on any type of transport throws an EOFError""" | 
|  | trans = TTransport.TMemoryBuffer(self.make_data()) | 
|  | trans.readAll(1) | 
|  |  | 
|  | try: | 
|  | trans.readAll(10000) | 
|  | except EOFError: | 
|  | return | 
|  |  | 
|  | self.fail("Should have gotten EOFError") | 
|  |  | 
|  | def eofTestHelper(self, pfactory): | 
|  | trans = TTransport.TMemoryBuffer(self.make_data(pfactory)) | 
|  | prot = pfactory.getProtocol(trans) | 
|  |  | 
|  | x = Xtruct() | 
|  | x.read(prot) | 
|  | self.assertEqual(x.string_thing, "Zero") | 
|  | self.assertEqual(x.byte_thing, 0) | 
|  |  | 
|  | x = Xtruct() | 
|  | x.read(prot) | 
|  | self.assertEqual(x.string_thing, "One") | 
|  | self.assertEqual(x.byte_thing, 1) | 
|  |  | 
|  | try: | 
|  | x = Xtruct() | 
|  | x.read(prot) | 
|  | except EOFError: | 
|  | return | 
|  |  | 
|  | self.fail("Should have gotten EOFError") | 
|  |  | 
|  | def eofTestHelperStress(self, pfactory): | 
|  | """Test the ability of TBinaryProtocol to deal with the removal of every byte in the file""" | 
|  | # TODO: we should make sure this covers more of the code paths | 
|  |  | 
|  | data = self.make_data(pfactory) | 
|  | for i in range(0, len(data) + 1): | 
|  | trans = TTransport.TMemoryBuffer(data[0:i]) | 
|  | prot = pfactory.getProtocol(trans) | 
|  | try: | 
|  | x = Xtruct() | 
|  | x.read(prot) | 
|  | x.read(prot) | 
|  | x.read(prot) | 
|  | except EOFError: | 
|  | continue | 
|  | self.fail("Should have gotten an EOFError") | 
|  |  | 
|  | def testBinaryProtocolEof(self): | 
|  | """Test that TBinaryProtocol throws an EOFError when it reaches the end of the stream""" | 
|  | self.eofTestHelper(TBinaryProtocol.TBinaryProtocolFactory()) | 
|  | self.eofTestHelperStress(TBinaryProtocol.TBinaryProtocolFactory()) | 
|  |  | 
|  | def testBinaryProtocolAcceleratedBinaryEof(self): | 
|  | """Test that TBinaryProtocolAccelerated throws an EOFError when it reaches the end of the stream""" | 
|  | self.eofTestHelper(TBinaryProtocol.TBinaryProtocolAcceleratedFactory(fallback=False)) | 
|  | self.eofTestHelperStress(TBinaryProtocol.TBinaryProtocolAcceleratedFactory(fallback=False)) | 
|  |  | 
|  | def testCompactProtocolEof(self): | 
|  | """Test that TCompactProtocol throws an EOFError when it reaches the end of the stream""" | 
|  | self.eofTestHelper(TCompactProtocol.TCompactProtocolFactory()) | 
|  | self.eofTestHelperStress(TCompactProtocol.TCompactProtocolFactory()) | 
|  |  | 
|  | def testCompactProtocolAcceleratedCompactEof(self): | 
|  | """Test that TCompactProtocolAccelerated throws an EOFError when it reaches the end of the stream""" | 
|  | self.eofTestHelper(TCompactProtocol.TCompactProtocolAcceleratedFactory(fallback=False)) | 
|  | self.eofTestHelperStress(TCompactProtocol.TCompactProtocolAcceleratedFactory(fallback=False)) | 
|  |  | 
|  |  | 
|  | def suite(): | 
|  | suite = unittest.TestSuite() | 
|  | loader = unittest.TestLoader() | 
|  | suite.addTest(loader.loadTestsFromTestCase(TestEof)) | 
|  | return suite | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | unittest.main(defaultTest="suite", testRunner=unittest.TextTestRunner(verbosity=2)) |