THRIFT-3596 Better conformance to PEP8
This closes #832
diff --git a/lib/py/test/_import_local_thrift.py b/lib/py/test/_import_local_thrift.py
index 30c1abc..1741669 100644
--- a/lib/py/test/_import_local_thrift.py
+++ b/lib/py/test/_import_local_thrift.py
@@ -6,8 +6,8 @@
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(SCRIPT_DIR)))
if sys.version_info[0] == 2:
- import glob
- libdir = glob.glob(os.path.join(ROOT_DIR, 'lib', 'py', 'build', 'lib.*'))[0]
- sys.path.insert(0, libdir)
+ import glob
+ libdir = glob.glob(os.path.join(ROOT_DIR, 'lib', 'py', 'build', 'lib.*'))[0]
+ sys.path.insert(0, libdir)
else:
- sys.path.insert(0, os.path.join(ROOT_DIR, 'lib', 'py', 'build', 'lib'))
+ sys.path.insert(0, os.path.join(ROOT_DIR, 'lib', 'py', 'build', 'lib'))
diff --git a/lib/py/test/test_sslsocket.py b/lib/py/test/test_sslsocket.py
index b7c3802..fa156a0 100644
--- a/lib/py/test/test_sslsocket.py
+++ b/lib/py/test/test_sslsocket.py
@@ -46,231 +46,231 @@
class ServerAcceptor(threading.Thread):
- def __init__(self, server):
- super(ServerAcceptor, self).__init__()
- self._server = server
- self.client = None
+ def __init__(self, server):
+ super(ServerAcceptor, self).__init__()
+ self._server = server
+ self.client = None
- def run(self):
- self._server.listen()
- self.client = self._server.accept()
+ def run(self):
+ self._server.listen()
+ self.client = self._server.accept()
# Python 2.6 compat
class AssertRaises(object):
- def __init__(self, expected):
- self._expected = expected
+ def __init__(self, expected):
+ self._expected = expected
- def __enter__(self):
- pass
+ def __enter__(self):
+ pass
- def __exit__(self, exc_type, exc_value, traceback):
- if not exc_type or not issubclass(exc_type, self._expected):
- raise Exception('fail')
- return True
+ def __exit__(self, exc_type, exc_value, traceback):
+ if not exc_type or not issubclass(exc_type, self._expected):
+ raise Exception('fail')
+ return True
class TSSLSocketTest(unittest.TestCase):
- def _assert_connection_failure(self, server, client):
- try:
- acc = ServerAcceptor(server)
- acc.start()
- time.sleep(CONNECT_DELAY)
- client.setTimeout(CONNECT_TIMEOUT)
- with self._assert_raises(Exception):
- client.open()
- select.select([], [client.handle], [], CONNECT_TIMEOUT)
- # self.assertIsNone(acc.client)
- self.assertTrue(acc.client is None)
- finally:
- server.close()
- client.close()
+ def _assert_connection_failure(self, server, client):
+ try:
+ acc = ServerAcceptor(server)
+ acc.start()
+ time.sleep(CONNECT_DELAY)
+ client.setTimeout(CONNECT_TIMEOUT)
+ with self._assert_raises(Exception):
+ client.open()
+ select.select([], [client.handle], [], CONNECT_TIMEOUT)
+ # self.assertIsNone(acc.client)
+ self.assertTrue(acc.client is None)
+ finally:
+ server.close()
+ client.close()
- def _assert_raises(self, exc):
- if sys.hexversion >= 0x020700F0:
- return self.assertRaises(exc)
- else:
- return AssertRaises(exc)
+ def _assert_raises(self, exc):
+ if sys.hexversion >= 0x020700F0:
+ return self.assertRaises(exc)
+ else:
+ return AssertRaises(exc)
- def _assert_connection_success(self, server, client):
- try:
- acc = ServerAcceptor(server)
- acc.start()
- time.sleep(0.15)
- client.setTimeout(CONNECT_TIMEOUT)
- client.open()
- select.select([], [client.handle], [], CONNECT_TIMEOUT)
- # self.assertIsNotNone(acc.client)
- self.assertTrue(acc.client is not None)
- finally:
- server.close()
- client.close()
+ def _assert_connection_success(self, server, client):
+ try:
+ acc = ServerAcceptor(server)
+ acc.start()
+ time.sleep(0.15)
+ client.setTimeout(CONNECT_TIMEOUT)
+ client.open()
+ select.select([], [client.handle], [], CONNECT_TIMEOUT)
+ # self.assertIsNotNone(acc.client)
+ self.assertTrue(acc.client is not None)
+ finally:
+ server.close()
+ client.close()
- # deprecated feature
- def test_deprecation(self):
- with warnings.catch_warnings(record=True) as w:
- warnings.filterwarnings('always', category=DeprecationWarning, module='thrift.*SSL.*')
- TSSLSocket('localhost', TEST_PORT, validate=True, ca_certs=SERVER_CERT)
- self.assertEqual(len(w), 1)
+ # deprecated feature
+ def test_deprecation(self):
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', category=DeprecationWarning, module='thrift.*SSL.*')
+ TSSLSocket('localhost', TEST_PORT, validate=True, ca_certs=SERVER_CERT)
+ self.assertEqual(len(w), 1)
- with warnings.catch_warnings(record=True) as w:
- warnings.filterwarnings('always', category=DeprecationWarning, module='thrift.*SSL.*')
- # Deprecated signature
- # def __init__(self, host='localhost', port=9090, validate=True, ca_certs=None, keyfile=None, certfile=None, unix_socket=None, ciphers=None):
- client = TSSLSocket('localhost', TEST_PORT, True, SERVER_CERT, CLIENT_KEY, CLIENT_CERT, None, TEST_CIPHERS)
- self.assertEqual(len(w), 7)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', category=DeprecationWarning, module='thrift.*SSL.*')
+ # Deprecated signature
+ # def __init__(self, host='localhost', port=9090, validate=True, ca_certs=None, keyfile=None, certfile=None, unix_socket=None, ciphers=None):
+ client = TSSLSocket('localhost', TEST_PORT, True, SERVER_CERT, CLIENT_KEY, CLIENT_CERT, None, TEST_CIPHERS)
+ self.assertEqual(len(w), 7)
- with warnings.catch_warnings(record=True) as w:
- warnings.filterwarnings('always', category=DeprecationWarning, module='thrift.*SSL.*')
- # Deprecated signature
- # def __init__(self, host=None, port=9090, certfile='cert.pem', unix_socket=None, ciphers=None):
- server = TSSLServerSocket(None, TEST_PORT, SERVER_PEM, None, TEST_CIPHERS)
- self.assertEqual(len(w), 3)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', category=DeprecationWarning, module='thrift.*SSL.*')
+ # Deprecated signature
+ # def __init__(self, host=None, port=9090, certfile='cert.pem', unix_socket=None, ciphers=None):
+ server = TSSLServerSocket(None, TEST_PORT, SERVER_PEM, None, TEST_CIPHERS)
+ self.assertEqual(len(w), 3)
- self._assert_connection_success(server, client)
+ self._assert_connection_success(server, client)
- # deprecated feature
- def test_set_cert_reqs_by_validate(self):
- c1 = TSSLSocket('localhost', TEST_PORT, validate=True, ca_certs=SERVER_CERT)
- self.assertEqual(c1.cert_reqs, ssl.CERT_REQUIRED)
+ # deprecated feature
+ def test_set_cert_reqs_by_validate(self):
+ c1 = TSSLSocket('localhost', TEST_PORT, validate=True, ca_certs=SERVER_CERT)
+ self.assertEqual(c1.cert_reqs, ssl.CERT_REQUIRED)
- c1 = TSSLSocket('localhost', TEST_PORT, validate=False)
- self.assertEqual(c1.cert_reqs, ssl.CERT_NONE)
+ c1 = TSSLSocket('localhost', TEST_PORT, validate=False)
+ self.assertEqual(c1.cert_reqs, ssl.CERT_NONE)
- # deprecated feature
- def test_set_validate_by_cert_reqs(self):
- c1 = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_NONE)
- self.assertFalse(c1.validate)
+ # deprecated feature
+ def test_set_validate_by_cert_reqs(self):
+ c1 = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_NONE)
+ self.assertFalse(c1.validate)
- c2 = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=SERVER_CERT)
- self.assertTrue(c2.validate)
+ c2 = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=SERVER_CERT)
+ self.assertTrue(c2.validate)
- c3 = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_OPTIONAL, ca_certs=SERVER_CERT)
- self.assertTrue(c3.validate)
+ c3 = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_OPTIONAL, ca_certs=SERVER_CERT)
+ self.assertTrue(c3.validate)
- def test_unix_domain_socket(self):
- if platform.system() == 'Windows':
- print('skipping test_unix_domain_socket')
- return
- server = TSSLServerSocket(unix_socket=TEST_ADDR, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- client = TSSLSocket(None, None, TEST_ADDR, cert_reqs=ssl.CERT_NONE)
- self._assert_connection_success(server, client)
+ def test_unix_domain_socket(self):
+ if platform.system() == 'Windows':
+ print('skipping test_unix_domain_socket')
+ return
+ server = TSSLServerSocket(unix_socket=TEST_ADDR, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ client = TSSLSocket(None, None, TEST_ADDR, cert_reqs=ssl.CERT_NONE)
+ self._assert_connection_success(server, client)
- def test_server_cert(self):
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=SERVER_CERT)
- self._assert_connection_success(server, client)
+ def test_server_cert(self):
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=SERVER_CERT)
+ self._assert_connection_success(server, client)
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- # server cert on in ca_certs
- client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=CLIENT_CERT)
- self._assert_connection_failure(server, client)
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ # server cert on in ca_certs
+ client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=CLIENT_CERT)
+ self._assert_connection_failure(server, client)
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_NONE)
- self._assert_connection_success(server, client)
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_NONE)
+ self._assert_connection_success(server, client)
- def test_set_server_cert(self):
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=CLIENT_CERT)
- with self._assert_raises(Exception):
- server.certfile = 'foo'
- with self._assert_raises(Exception):
- server.certfile = None
- server.certfile = SERVER_CERT
- client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=SERVER_CERT)
- self._assert_connection_success(server, client)
+ def test_set_server_cert(self):
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=CLIENT_CERT)
+ with self._assert_raises(Exception):
+ server.certfile = 'foo'
+ with self._assert_raises(Exception):
+ server.certfile = None
+ server.certfile = SERVER_CERT
+ client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, ca_certs=SERVER_CERT)
+ self._assert_connection_success(server, client)
- def test_client_cert(self):
- server = TSSLServerSocket(
- port=TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, keyfile=SERVER_KEY,
- certfile=SERVER_CERT, ca_certs=CLIENT_CERT)
- client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_NONE, certfile=CLIENT_CERT, keyfile=CLIENT_KEY)
- self._assert_connection_success(server, client)
+ def test_client_cert(self):
+ server = TSSLServerSocket(
+ port=TEST_PORT, cert_reqs=ssl.CERT_REQUIRED, keyfile=SERVER_KEY,
+ certfile=SERVER_CERT, ca_certs=CLIENT_CERT)
+ client = TSSLSocket('localhost', TEST_PORT, cert_reqs=ssl.CERT_NONE, certfile=CLIENT_CERT, keyfile=CLIENT_KEY)
+ self._assert_connection_success(server, client)
- def test_ciphers(self):
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ciphers=TEST_CIPHERS)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ciphers=TEST_CIPHERS)
- self._assert_connection_success(server, client)
+ def test_ciphers(self):
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ciphers=TEST_CIPHERS)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ciphers=TEST_CIPHERS)
+ self._assert_connection_success(server, client)
- if not TSSLSocket._has_ciphers:
- # unittest.skip is not available for Python 2.6
- print('skipping test_ciphers')
- return
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ciphers='NULL')
- self._assert_connection_failure(server, client)
+ if not TSSLSocket._has_ciphers:
+ # unittest.skip is not available for Python 2.6
+ print('skipping test_ciphers')
+ return
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ciphers='NULL')
+ self._assert_connection_failure(server, client)
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ciphers=TEST_CIPHERS)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ciphers='NULL')
- self._assert_connection_failure(server, client)
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ciphers=TEST_CIPHERS)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ciphers='NULL')
+ self._assert_connection_failure(server, client)
- def test_ssl2_and_ssl3_disabled(self):
- if not hasattr(ssl, 'PROTOCOL_SSLv3'):
- print('PROTOCOL_SSLv3 is not available')
- else:
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv3)
- self._assert_connection_failure(server, client)
+ def test_ssl2_and_ssl3_disabled(self):
+ if not hasattr(ssl, 'PROTOCOL_SSLv3'):
+ print('PROTOCOL_SSLv3 is not available')
+ else:
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv3)
+ self._assert_connection_failure(server, client)
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv3)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT)
- self._assert_connection_failure(server, client)
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv3)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT)
+ self._assert_connection_failure(server, client)
- if not hasattr(ssl, 'PROTOCOL_SSLv2'):
- print('PROTOCOL_SSLv2 is not available')
- else:
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv2)
- self._assert_connection_failure(server, client)
+ if not hasattr(ssl, 'PROTOCOL_SSLv2'):
+ print('PROTOCOL_SSLv2 is not available')
+ else:
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv2)
+ self._assert_connection_failure(server, client)
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv2)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT)
- self._assert_connection_failure(server, client)
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_SSLv2)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT)
+ self._assert_connection_failure(server, client)
- def test_newer_tls(self):
- if not TSSLSocket._has_ssl_context:
- # unittest.skip is not available for Python 2.6
- print('skipping test_newer_tls')
- return
- if not hasattr(ssl, 'PROTOCOL_TLSv1_2'):
- print('PROTOCOL_TLSv1_2 is not available')
- else:
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_2)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_2)
- self._assert_connection_success(server, client)
+ def test_newer_tls(self):
+ if not TSSLSocket._has_ssl_context:
+ # unittest.skip is not available for Python 2.6
+ print('skipping test_newer_tls')
+ return
+ if not hasattr(ssl, 'PROTOCOL_TLSv1_2'):
+ print('PROTOCOL_TLSv1_2 is not available')
+ else:
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_2)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_2)
+ self._assert_connection_success(server, client)
- if not hasattr(ssl, 'PROTOCOL_TLSv1_1'):
- print('PROTOCOL_TLSv1_1 is not available')
- else:
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_1)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_1)
- self._assert_connection_success(server, client)
+ if not hasattr(ssl, 'PROTOCOL_TLSv1_1'):
+ print('PROTOCOL_TLSv1_1 is not available')
+ else:
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_1)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_1)
+ self._assert_connection_success(server, client)
- if not hasattr(ssl, 'PROTOCOL_TLSv1_1') or not hasattr(ssl, 'PROTOCOL_TLSv1_2'):
- print('PROTOCOL_TLSv1_1 and/or PROTOCOL_TLSv1_2 is not available')
- else:
- server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_2)
- client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_1)
- self._assert_connection_failure(server, client)
+ if not hasattr(ssl, 'PROTOCOL_TLSv1_1') or not hasattr(ssl, 'PROTOCOL_TLSv1_2'):
+ print('PROTOCOL_TLSv1_1 and/or PROTOCOL_TLSv1_2 is not available')
+ else:
+ server = TSSLServerSocket(port=TEST_PORT, keyfile=SERVER_KEY, certfile=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_2)
+ client = TSSLSocket('localhost', TEST_PORT, ca_certs=SERVER_CERT, ssl_version=ssl.PROTOCOL_TLSv1_1)
+ self._assert_connection_failure(server, client)
- def test_ssl_context(self):
- if not TSSLSocket._has_ssl_context:
- # unittest.skip is not available for Python 2.6
- print('skipping test_ssl_context')
- return
- server_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- server_context.load_cert_chain(SERVER_CERT, SERVER_KEY)
- server_context.load_verify_locations(CLIENT_CERT)
+ def test_ssl_context(self):
+ if not TSSLSocket._has_ssl_context:
+ # unittest.skip is not available for Python 2.6
+ print('skipping test_ssl_context')
+ return
+ server_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
+ server_context.load_cert_chain(SERVER_CERT, SERVER_KEY)
+ server_context.load_verify_locations(CLIENT_CERT)
- client_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
- client_context.load_cert_chain(CLIENT_CERT, CLIENT_KEY)
- client_context.load_verify_locations(SERVER_CERT)
+ client_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
+ client_context.load_cert_chain(CLIENT_CERT, CLIENT_KEY)
+ client_context.load_verify_locations(SERVER_CERT)
- server = TSSLServerSocket(port=TEST_PORT, ssl_context=server_context)
- client = TSSLSocket('localhost', TEST_PORT, ssl_context=client_context)
- self._assert_connection_success(server, client)
+ server = TSSLServerSocket(port=TEST_PORT, ssl_context=server_context)
+ client = TSSLSocket('localhost', TEST_PORT, ssl_context=client_context)
+ self._assert_connection_success(server, client)
if __name__ == '__main__':
- # import logging
- # logging.basicConfig(level=logging.DEBUG)
- unittest.main()
+ # import logging
+ # logging.basicConfig(level=logging.DEBUG)
+ unittest.main()
diff --git a/lib/py/test/thrift_json.py b/lib/py/test/thrift_json.py
index e60aaba..5ba7dd5 100644
--- a/lib/py/test/thrift_json.py
+++ b/lib/py/test/thrift_json.py
@@ -31,20 +31,20 @@
# mklink /D thrift ..\src
#
+
class TestJSONString(unittest.TestCase):
- def test_escaped_unicode_string(self):
- unicode_json = b'"hello \\u0e01\\u0e02\\u0e03\\ud835\\udcab\\udb40\\udc70 unicode"'
- unicode_text = u'hello \u0e01\u0e02\u0e03\U0001D4AB\U000E0070 unicode'
+ def test_escaped_unicode_string(self):
+ unicode_json = b'"hello \\u0e01\\u0e02\\u0e03\\ud835\\udcab\\udb40\\udc70 unicode"'
+ unicode_text = u'hello \u0e01\u0e02\u0e03\U0001D4AB\U000E0070 unicode'
- buf = TTransport.TMemoryBuffer(unicode_json)
- transport = TTransport.TBufferedTransportFactory().getTransport(buf)
- protocol = TJSONProtocol(transport)
+ buf = TTransport.TMemoryBuffer(unicode_json)
+ transport = TTransport.TBufferedTransportFactory().getTransport(buf)
+ protocol = TJSONProtocol(transport)
- if sys.version_info[0] == 2:
- unicode_text = unicode_text.encode('utf8')
- self.assertEqual(protocol.readString(), unicode_text)
+ if sys.version_info[0] == 2:
+ unicode_text = unicode_text.encode('utf8')
+ self.assertEqual(protocol.readString(), unicode_text)
if __name__ == '__main__':
- unittest.main()
-
+ unittest.main()