#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import sys, glob
from optparse import OptionParser
parser = OptionParser()
parser.add_option('--genpydir', type='string', dest='genpydir', default='gen-py')
options, args = parser.parse_args()
del sys.argv[1:] # clean up hack so unittest doesn't complain
sys.path.insert(0, options.genpydir)
sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])

from ThriftTest.ttypes import *
from DebugProtoTest.ttypes import CompactProtoTestStruct, Empty
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol import TBinaryProtocol, TCompactProtocol, TJSONProtocol
from thrift.TSerialization import serialize, deserialize
import unittest
import time

class AbstractTest(unittest.TestCase):

  def setUp(self):
      self.v1obj = VersioningTestV1(
          begin_in_both=12345,
          old_string='aaa',
          end_in_both=54321,
          )

      self.v2obj = VersioningTestV2(
          begin_in_both=12345,
          newint=1,
          newbyte=2,
          newshort=3,
          newlong=4,
          newdouble=5.0,
          newstruct=Bonk(message="Hello!", type=123),
          newlist=[7,8,9],
          newset=set([42,1,8]),
          newmap={1:2,2:3},
          newstring="Hola!",
          end_in_both=54321,
          )

      self.bools = Bools(im_true=True, im_false=False)
      self.bools_flipped = Bools(im_true=False, im_false=True)

      self.large_deltas = LargeDeltas (
          b1=self.bools,
          b10=self.bools_flipped,
          b100=self.bools,
          check_true=True,
          b1000=self.bools_flipped,
          check_false=False,
          vertwo2000=VersioningTestV2(newstruct=Bonk(message='World!', type=314)),
          a_set2500=set(['lazy', 'brown', 'cow']),
          vertwo3000=VersioningTestV2(newset=set([2, 3, 5, 7, 11])),
          big_numbers=[2**8, 2**16, 2**31-1, -(2**31-1)]
          )

      self.compact_struct = CompactProtoTestStruct(
          a_byte = 127,
          a_i16=32000,
          a_i32=1000000000,
          a_i64=0xffffffffff,
          a_double=5.6789,
          a_string="my string",
          true_field=True,
          false_field=False,
          empty_struct_field=Empty(),
          byte_list=[-127, -1, 0, 1, 127],
          i16_list=[-1, 0, 1, 0x7fff],
          i32_list= [-1, 0, 0xff, 0xffff, 0xffffff, 0x7fffffff],
          i64_list=[-1, 0, 0xff, 0xffff, 0xffffff, 0xffffffff, 0xffffffffff, 0xffffffffffff, 0xffffffffffffff, 0x7fffffffffffffff],
          double_list=[0.1, 0.2, 0.3],
          string_list=["first", "second", "third"],
          boolean_list=[True, True, True, False, False, False],
          struct_list=[Empty(), Empty()],
          byte_set=set([-127, -1, 0, 1, 127]),
          i16_set=set([-1, 0, 1, 0x7fff]),
          i32_set=set([1, 2, 3]),
          i64_set=set([-1, 0, 0xff, 0xffff, 0xffffff, 0xffffffff, 0xffffffffff, 0xffffffffffff, 0xffffffffffffff, 0x7fffffffffffffff]),
          double_set=set([0.1, 0.2, 0.3]),
          string_set=set(["first", "second", "third"]),
          boolean_set=set([True, False]),
          #struct_set=set([Empty()]), # unhashable instance
          byte_byte_map={1 : 2},
          i16_byte_map={1 : 1, -1 : 1, 0x7fff : 1},
          i32_byte_map={1 : 1, -1 : 1, 0x7fffffff : 1},
          i64_byte_map={0 : 1,  1 : 1, -1 : 1, 0x7fffffffffffffff : 1},
          double_byte_map={-1.1 : 1, 1.1 : 1},
          string_byte_map={"first" : 1, "second" : 2, "third" : 3, "" : 0},
          boolean_byte_map={True : 1, False: 0},
          byte_i16_map={1 : 1, 2 : -1, 3 : 0x7fff},
          byte_i32_map={1 : 1, 2 : -1, 3 : 0x7fffffff},
          byte_i64_map={1 : 1, 2 : -1, 3 : 0x7fffffffffffffff},
          byte_double_map={1 : 0.1, 2 : -0.1, 3 : 1000000.1},
          byte_string_map={1 : "", 2 : "blah", 3 : "loooooooooooooong string"},
          byte_boolean_map={1 : True, 2 : False},
          #list_byte_map # unhashable
          #set_byte_map={set([1, 2, 3]) : 1, set([0, 1]) : 2, set([]) : 0}, # unhashable
          #map_byte_map # unhashable
          byte_map_map={0 : {}, 1 : {1 : 1}, 2 : {1 : 1, 2 : 2}},
          byte_set_map={0 : set([]), 1 : set([1]), 2 : set([1, 2])},
          byte_list_map={0 : [], 1 : [1], 2 : [1, 2]},
          )

      self.nested_lists_i32x2 = NestedListsI32x2(
                                              [
                                                [ 1, 1, 2 ],
                                                [ 2, 7, 9 ],
                                                [ 3, 5, 8 ]
                                              ]
                                            )

      self.nested_lists_i32x3 = NestedListsI32x3(
                                              [
                                                [
                                                  [ 2, 7, 9 ],
                                                  [ 3, 5, 8 ]
                                                ],
                                                [
                                                  [ 1, 1, 2 ],
                                                  [ 1, 4, 9 ]
                                                ]
                                              ]
                                            )

      self.nested_mixedx2 = NestedMixedx2( int_set_list=[
                                            set([1,2,3]),
                                            set([1,4,9]),
                                            set([1,2,3,5,8,13,21]),
                                            set([-1, 0, 1])
                                            ],
                                            # note, the sets below are sets of chars, since the strings are iterated
                                            map_int_strset={ 10:set('abc'), 20:set('def'), 30:set('GHI') },
                                            map_int_strset_list=[
                                                                 { 10:set('abc'), 20:set('def'), 30:set('GHI') },
                                                                 { 100:set('lmn'), 200:set('opq'), 300:set('RST') },
                                                                 { 1000:set('uvw'), 2000:set('wxy'), 3000:set('XYZ') }
                                                                 ]
                                          )

      self.nested_lists_bonk = NestedListsBonk(
                                              [
                                                [
                                                  [
                                                    Bonk(message='inner A first', type=1),
                                                    Bonk(message='inner A second', type=1)
                                                  ],
                                                  [
                                                  Bonk(message='inner B first', type=2),
                                                  Bonk(message='inner B second', type=2)
                                                  ]
                                                ]
                                              ]
                                            )

      self.list_bonks = ListBonks(
                                    [
                                      Bonk(message='inner A', type=1),
                                      Bonk(message='inner B', type=2),
                                      Bonk(message='inner C', type=0)
                                    ]
                                  )

  def _serialize(self, obj):
    trans = TTransport.TMemoryBuffer()
    prot = self.protocol_factory.getProtocol(trans)
    obj.write(prot)
    return trans.getvalue()

  def _deserialize(self, objtype, data):
    prot = self.protocol_factory.getProtocol(TTransport.TMemoryBuffer(data))
    ret = objtype()
    ret.read(prot)
    return ret

  def testForwards(self):
    obj = self._deserialize(VersioningTestV2, self._serialize(self.v1obj))
    self.assertEquals(obj.begin_in_both, self.v1obj.begin_in_both)
    self.assertEquals(obj.end_in_both, self.v1obj.end_in_both)

  def testBackwards(self):
    obj = self._deserialize(VersioningTestV1, self._serialize(self.v2obj))
    self.assertEquals(obj.begin_in_both, self.v2obj.begin_in_both)
    self.assertEquals(obj.end_in_both, self.v2obj.end_in_both)

  def testSerializeV1(self):
    obj = self._deserialize(VersioningTestV1, self._serialize(self.v1obj))
    self.assertEquals(obj, self.v1obj)

  def testSerializeV2(self):
    obj = self._deserialize(VersioningTestV2, self._serialize(self.v2obj))
    self.assertEquals(obj, self.v2obj)

  def testBools(self):
    self.assertNotEquals(self.bools, self.bools_flipped)
    self.assertNotEquals(self.bools, self.v1obj)
    obj = self._deserialize(Bools, self._serialize(self.bools))
    self.assertEquals(obj, self.bools)
    obj = self._deserialize(Bools, self._serialize(self.bools_flipped))
    self.assertEquals(obj, self.bools_flipped)
    rep = repr(self.bools)
    self.assertTrue(len(rep) > 0)

  def testLargeDeltas(self):
    # test large field deltas (meaningful in CompactProto only)
    obj = self._deserialize(LargeDeltas, self._serialize(self.large_deltas))
    self.assertEquals(obj, self.large_deltas)
    rep = repr(self.large_deltas)
    self.assertTrue(len(rep) > 0)

  def testNestedListsI32x2(self):
    obj = self._deserialize(NestedListsI32x2, self._serialize(self.nested_lists_i32x2))
    self.assertEquals(obj, self.nested_lists_i32x2)
    rep = repr(self.nested_lists_i32x2)
    self.assertTrue(len(rep) > 0)

  def testNestedListsI32x3(self):
    obj = self._deserialize(NestedListsI32x3, self._serialize(self.nested_lists_i32x3))
    self.assertEquals(obj, self.nested_lists_i32x3)
    rep = repr(self.nested_lists_i32x3)
    self.assertTrue(len(rep) > 0)

  def testNestedMixedx2(self):
    obj = self._deserialize(NestedMixedx2, self._serialize(self.nested_mixedx2))
    self.assertEquals(obj, self.nested_mixedx2)
    rep = repr(self.nested_mixedx2)
    self.assertTrue(len(rep) > 0)

  def testNestedListsBonk(self):
    obj = self._deserialize(NestedListsBonk, self._serialize(self.nested_lists_bonk))
    self.assertEquals(obj, self.nested_lists_bonk)
    rep = repr(self.nested_lists_bonk)
    self.assertTrue(len(rep) > 0)

  def testListBonks(self):
    obj = self._deserialize(ListBonks, self._serialize(self.list_bonks))
    self.assertEquals(obj, self.list_bonks)
    rep = repr(self.list_bonks)
    self.assertTrue(len(rep) > 0)

  def testCompactStruct(self):
    # test large field deltas (meaningful in CompactProto only)
    obj = self._deserialize(CompactProtoTestStruct, self._serialize(self.compact_struct))
    self.assertEquals(obj, self.compact_struct)
    rep = repr(self.compact_struct)
    self.assertTrue(len(rep) > 0)

  def testIntegerLimits(self):
    bad_values = [CompactProtoTestStruct(a_byte=128), CompactProtoTestStruct(a_byte=-129),
                  CompactProtoTestStruct(a_i16=32768), CompactProtoTestStruct(a_i16=-32769),
                  CompactProtoTestStruct(a_i32=2147483648), CompactProtoTestStruct(a_i32=-2147483649),
                  CompactProtoTestStruct(a_i64=9223372036854775808), CompactProtoTestStruct(a_i64=-9223372036854775809)
                ]

    for value in bad_values:
      self.assertRaises(Exception, self._serialize, value)

class NormalBinaryTest(AbstractTest):
  protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()

class AcceleratedBinaryTest(AbstractTest):
  protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory()

class CompactProtocolTest(AbstractTest):
  protocol_factory = TCompactProtocol.TCompactProtocolFactory()

class JSONProtocolTest(AbstractTest):
  protocol_factory = TJSONProtocol.TJSONProtocolFactory()

class AcceleratedFramedTest(unittest.TestCase):
  def testSplit(self):
    """Test FramedTransport and BinaryProtocolAccelerated

    Tests that TBinaryProtocolAccelerated and TFramedTransport
    play nicely together when a read spans a frame"""

    protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
    bigstring = "".join(chr(byte) for byte in range(ord("a"), ord("z")+1))

    databuf = TTransport.TMemoryBuffer()
    prot = protocol_factory.getProtocol(databuf)
    prot.writeI32(42)
    prot.writeString(bigstring)
    prot.writeI16(24)
    data = databuf.getvalue()
    cutpoint = len(data)/2
    parts = [ data[:cutpoint], data[cutpoint:] ]

    framed_buffer = TTransport.TMemoryBuffer()
    framed_writer = TTransport.TFramedTransport(framed_buffer)
    for part in parts:
      framed_writer.write(part)
      framed_writer.flush()
    self.assertEquals(len(framed_buffer.getvalue()), len(data) + 8)

    # Recreate framed_buffer so we can read from it.
    framed_buffer = TTransport.TMemoryBuffer(framed_buffer.getvalue())
    framed_reader = TTransport.TFramedTransport(framed_buffer)
    prot = protocol_factory.getProtocol(framed_reader)
    self.assertEqual(prot.readI32(), 42)
    self.assertEqual(prot.readString(), bigstring)
    self.assertEqual(prot.readI16(), 24)

class SerializersTest(unittest.TestCase):

  def testSerializeThenDeserialize(self):
    obj = Xtruct2(i32_thing=1,
                  struct_thing=Xtruct(string_thing="foo"))

    s1 = serialize(obj)
    for i in range(10):
      self.assertEquals(s1, serialize(obj))
      objcopy = Xtruct2()
      deserialize(objcopy, serialize(obj))
      self.assertEquals(obj, objcopy)

    obj = Xtruct(string_thing="bar")
    objcopy = Xtruct()
    deserialize(objcopy, serialize(obj))
    self.assertEquals(obj, objcopy)

    # test booleans
    obj = Bools(im_true=True, im_false=False)
    objcopy = Bools()
    deserialize(objcopy, serialize(obj))
    self.assertEquals(obj, objcopy)
    
    # test enums
    for num, name in Numberz._VALUES_TO_NAMES.iteritems():
      obj = Bonk(message='enum Numberz value %d is string %s' % (num, name), type=num)
      objcopy = Bonk()
      deserialize(objcopy, serialize(obj))
      self.assertEquals(obj, objcopy)
  

def suite():
  suite = unittest.TestSuite()
  loader = unittest.TestLoader()

  suite.addTest(loader.loadTestsFromTestCase(NormalBinaryTest))
  suite.addTest(loader.loadTestsFromTestCase(AcceleratedBinaryTest))
  suite.addTest(loader.loadTestsFromTestCase(CompactProtocolTest))
  suite.addTest(loader.loadTestsFromTestCase(JSONProtocolTest))
  suite.addTest(loader.loadTestsFromTestCase(AcceleratedFramedTest))
  suite.addTest(loader.loadTestsFromTestCase(SerializersTest))
  return suite

if __name__ == "__main__":
  unittest.main(defaultTest="suite", testRunner=unittest.TextTestRunner(verbosity=2))
