THRIFT-1103. py: TZlibTransport for python, a zlib compressed transport
This patch adds a new TZlibTransport to the Python library and extends the test suite to exercise it.
Patch: Will Pierce
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1084276 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/transport/TSSLSocket.py b/lib/py/src/transport/TSSLSocket.py
index 5eff5e6..be35844 100644
--- a/lib/py/src/transport/TSSLSocket.py
+++ b/lib/py/src/transport/TSSLSocket.py
@@ -36,7 +36,7 @@
"""
SSL_VERSION = ssl.PROTOCOL_TLSv1
- def __init__(self, validate=True, ca_certs=None, *args, **kwargs):
+ def __init__(self, host='localhost', port=9090, validate=True, ca_certs=None, unix_socket=None):
"""
@param validate: Set to False to disable SSL certificate validation entirely.
@type validate: bool
@@ -56,10 +56,10 @@
else:
self.cert_reqs = ssl.CERT_REQUIRED
self.ca_certs = ca_certs
- if validate and ca_certs is not None:
- if not os.access(ca_certs, os.R_OK):
+ if validate:
+ if ca_certs is None or not os.access(ca_certs, os.R_OK):
raise IOError('Certificate Authority ca_certs file "%s" is not readable, cannot validate SSL certificates.' % (ca_certs))
- TSocket.TSocket.__init__(self, *args, **kwargs)
+ TSocket.TSocket.__init__(self, host, port, unix_socket)
def open(self):
try:
@@ -131,7 +131,7 @@
"""
SSL_VERSION = ssl.PROTOCOL_TLSv1
- def __init__(self, certfile='cert.pem', *args, **kwargs):
+ def __init__(self, host=None, port=9090, certfile='cert.pem', unix_socket=None):
"""Initialize a TSSLServerSocket
@param certfile: The filename of the server certificate file, defaults to cert.pem
@@ -143,7 +143,7 @@
@type port: int
"""
self.setCertfile(certfile)
- TSocket.TServerSocket.__init__(self, *args, **kwargs)
+ TSocket.TServerSocket.__init__(self, host, port)
def setCertfile(self, certfile):
"""Set or change the server certificate file used to wrap new connections.
@@ -159,8 +159,18 @@
def accept(self):
plain_client, addr = self.handle.accept()
+ try:
+ client = ssl.wrap_socket(plain_client, certfile=self.certfile,
+ server_side=True, ssl_version=self.SSL_VERSION)
+ except ssl.SSLError, ssl_exc:
+ # failed handshake/ssl wrap, close socket to client
+ plain_client.close()
+ # raise ssl_exc
+ # We can't raise the exception, because it kills most TServer derived serve()
+ # methods.
+ # Instead, return None, and let the TServer instance deal with it in
+ # other exception handling. (but TSimpleServer dies anyway)
+ return None
result = TSocket.TSocket()
- client = ssl.wrap_socket(plain_client, certfile=self.certfile,
- server_side=True, ssl_version=self.SSL_VERSION)
result.setHandle(client)
return result
diff --git a/lib/py/src/transport/TZlibTransport.py b/lib/py/src/transport/TZlibTransport.py
new file mode 100644
index 0000000..784d4e1
--- /dev/null
+++ b/lib/py/src/transport/TZlibTransport.py
@@ -0,0 +1,261 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+'''
+TZlibTransport provides a compressed transport and transport factory
+class, using the python standard library zlib module to implement
+data compression.
+'''
+
+from __future__ import division
+import zlib
+from cStringIO import StringIO
+from TTransport import TTransportBase, CReadableTransport
+
+class TZlibTransportFactory(object):
+ '''
+ Factory transport that builds zlib compressed transports.
+
+ This factory caches the last single client/transport that it was passed
+ and returns the same TZlibTransport object that was created.
+
+ This caching means the TServer class will get the _same_ transport
+ object for both input and output transports from this factory.
+ (For non-threaded scenarios only, since the cache only holds one object)
+
+ The purpose of this caching is to allocate only one TZlibTransport where
+ only one is really needed (since it must have separate read/write buffers),
+ and makes the statistics from getCompSavings() and getCompRatio()
+ easier to understand.
+ '''
+
+ # class scoped cache of last transport given and zlibtransport returned
+ _last_trans = None
+ _last_z = None
+
+ def getTransport(self, trans, compresslevel=9):
+ '''Wrap a transport , trans, with the TZlibTransport
+ compressed transport class, returning a new
+ transport to the caller.
+
+ @param compresslevel: The zlib compression level, ranging
+ from 0 (no compression) to 9 (best compression). Defaults to 9.
+ @type compresslevel: int
+
+ This method returns a TZlibTransport which wraps the
+ passed C{trans} TTransport derived instance.
+ '''
+ if trans == self._last_trans:
+ return self._last_z
+ ztrans = TZlibTransport(trans, compresslevel)
+ self._last_trans = trans
+ self._last_z = ztrans
+ return ztrans
+
+
+class TZlibTransport(TTransportBase, CReadableTransport):
+ '''
+ Class that wraps a transport with zlib, compressing writes
+ and decompresses reads, using the python standard
+ library zlib module.
+ '''
+
+ # Read buffer size for the python fastbinary C extension,
+ # the TBinaryProtocolAccelerated class.
+ DEFAULT_BUFFSIZE = 4096
+
+ def __init__(self, trans, compresslevel=9):
+ '''
+ Create a new TZlibTransport, wrapping C{trans}, another
+ TTransport derived object.
+
+ @param trans: A thrift transport object, i.e. a TSocket() object.
+ @type trans: TTransport
+ @param compresslevel: The zlib compression level, ranging
+ from 0 (no compression) to 9 (best compression). Default is 9.
+ @type compresslevel: int
+ '''
+ self.__trans = trans
+ self.compresslevel = compresslevel
+ self.__rbuf = StringIO()
+ self.__wbuf = StringIO()
+ self._init_zlib()
+ self._init_stats()
+
+ def _reinit_buffers(self):
+ '''
+ Internal method to initialize/reset the internal StringIO objects
+ for read and write buffers.
+ '''
+ self.__rbuf = StringIO()
+ self.__wbuf = StringIO()
+
+ def _init_stats(self):
+ '''
+ Internal method to reset the internal statistics counters
+ for compression ratios and bandwidth savings.
+ '''
+ self.bytes_in = 0
+ self.bytes_out = 0
+ self.bytes_in_comp = 0
+ self.bytes_out_comp = 0
+
+ def _init_zlib(self):
+ '''
+ Internal method for setting up the zlib compression and
+ decompression objects.
+ '''
+ self._zcomp_read = zlib.decompressobj()
+ self._zcomp_write = zlib.compressobj(self.compresslevel)
+
+ def getCompRatio(self):
+ '''
+ Get the current measured compression ratios (in,out) from
+ this transport.
+
+ Returns a tuple of:
+ (inbound_compression_ratio, outbound_compression_ratio)
+
+ The compression ratios are computed as:
+ compressed / uncompressed
+
+ E.g., data that compresses by 10x will have a ratio of: 0.10
+ and data that compresses to half of ts original size will
+ have a ratio of 0.5
+
+ None is returned if no bytes have yet been processed in
+ a particular direction.
+ '''
+ r_percent, w_percent = (None, None)
+ if self.bytes_in > 0:
+ r_percent = self.bytes_in_comp / self.bytes_in
+ if self.bytes_out > 0:
+ w_percent = self.bytes_out_comp / self.bytes_out
+ return (r_percent, w_percent)
+
+ def getCompSavings(self):
+ '''
+ Get the current count of saved bytes due to data
+ compression.
+
+ Returns a tuple of:
+ (inbound_saved_bytes, outbound_saved_bytes)
+
+ Note: if compression is actually expanding your
+ data (only likely with very tiny thrift objects), then
+ the values returned will be negative.
+ '''
+ r_saved = self.bytes_in - self.bytes_in_comp
+ w_saved = self.bytes_out - self.bytes_out_comp
+ return (r_saved, w_saved)
+
+ def isOpen(self):
+ '''Return the underlying transport's open status'''
+ return self.__trans.isOpen()
+
+ def open(self):
+ """Open the underlying transport"""
+ self._init_stats()
+ return self.__trans.open()
+
+ def listen(self):
+ '''Invoke the underlying transport's listen() method'''
+ self.__trans.listen()
+
+ def accept(self):
+ '''Accept connections on the underlying transport'''
+ return self.__trans.accept()
+
+ def close(self):
+ '''Close the underlying transport,'''
+ self._reinit_buffers()
+ self._init_zlib()
+ return self.__trans.close()
+
+ def read(self, sz):
+ '''
+ Read up to sz bytes from the decompressed bytes buffer, and
+ read from the underlying transport if the decompression
+ buffer is empty.
+ '''
+ ret = self.__rbuf.read(sz)
+ if len(ret) > 0:
+ return ret
+ # keep reading from transport until something comes back
+ while True:
+ if self.readComp(sz):
+ break
+ ret = self.__rbuf.read(sz)
+ return ret
+
+ def readComp(self, sz):
+ '''
+ Read compressed data from the underlying transport, then
+ decompress it and append it to the internal StringIO read buffer
+ '''
+ zbuf = self.__trans.read(sz)
+ zbuf = self._zcomp_read.unconsumed_tail + zbuf
+ buf = self._zcomp_read.decompress(zbuf)
+ self.bytes_in += len(zbuf)
+ self.bytes_in_comp += len(buf)
+ old = self.__rbuf.read()
+ self.__rbuf = StringIO(old + buf)
+ if len(old) + len(buf) == 0:
+ return False
+ return True
+
+ def write(self, buf):
+ '''
+ Write some bytes, putting them into the internal write
+ buffer for eventual compression.
+ '''
+ self.__wbuf.write(buf)
+
+ def flush(self):
+ '''
+ Flush any queued up data in the write buffer and ensure the
+ compression buffer is flushed out to the underlying transport
+ '''
+ wout = self.__wbuf.getvalue()
+ if len(wout) > 0:
+ zbuf = self._zcomp_write.compress(wout)
+ self.bytes_out += len(wout)
+ self.bytes_out_comp += len(zbuf)
+ else:
+ zbuf = ''
+ ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH)
+ self.bytes_out_comp += len(ztail)
+ if (len(zbuf) + len(ztail)) > 0:
+ self.__wbuf = StringIO()
+ self.__trans.write(zbuf + ztail)
+ self.__trans.flush()
+
+ @property
+ def cstringio_buf(self):
+ '''Implement the CReadableTransport interface'''
+ return self.__rbuf
+
+ def cstringio_refill(self, partialread, reqlen):
+ '''Implement the CReadableTransport interface for refill'''
+ retstring = partialread
+ if reqlen < self.DEFAULT_BUFFSIZE:
+ retstring += self.read(self.DEFAULT_BUFFSIZE)
+ while len(retstring) < reqlen:
+ retstring += self.read(reqlen - len(retstring))
+ self.__rbuf = StringIO(retstring)
+ return self.__rbuf
diff --git a/lib/py/src/transport/__init__.py b/lib/py/src/transport/__init__.py
index 02c6048..46e54fe 100644
--- a/lib/py/src/transport/__init__.py
+++ b/lib/py/src/transport/__init__.py
@@ -17,4 +17,4 @@
# under the License.
#
-__all__ = ['TTransport', 'TSocket', 'THttpClient']
+__all__ = ['TTransport', 'TSocket', 'THttpClient','TZlibTransport']
diff --git a/test/py/RunClientServer.py b/test/py/RunClientServer.py
index dced91a..633856f 100755
--- a/test/py/RunClientServer.py
+++ b/test/py/RunClientServer.py
@@ -30,9 +30,18 @@
parser = OptionParser()
parser.add_option("--port", type="int", dest="port", default=9090,
help="port number for server to listen on")
+parser.add_option('-v', '--verbose', action="store_const",
+ dest="verbose", const=2,
+ help="verbose output")
+parser.add_option('-q', '--quiet', action="store_const",
+ dest="verbose", const=0,
+ help="minimal output")
+parser.set_defaults(verbose=1)
options, args = parser.parse_args()
FRAMED = ["TNonblockingServer"]
+SKIP_ZLIB = ['TNonblockingServer', 'THttpServer']
+SKIP_SSL = ['TNonblockingServer', 'THttpServer']
EXTRA_DELAY = ['TProcessPoolServer']
EXTRA_SLEEP = 3.5
@@ -58,6 +67,11 @@
print 'Warning: the multiprocessing module is unavailable. Skipping tests for TProcessPoolServer'
SERVERS.remove('TProcessPoolServer')
+try:
+ import ssl
+except:
+ print 'Warning, no ssl module available. Skipping all SSL tests.'
+ SKIP_SSL.extend(SERVERS)
# commandline permits a single class name to be specified to override SERVERS=[...]
if len(args) == 1:
@@ -71,41 +85,68 @@
def relfile(fname):
return os.path.join(os.path.dirname(__file__), fname)
-def runTest(server_class, proto, port):
- server_args = [sys.executable, # /usr/bin/python or similar
- relfile('TestServer.py'), # ./TestServer.py
- '--proto=%s' % proto, # accel, binary or compact
- '--port=%d' % port, # usually 9090, given on cmdline
- server_class] # name of class to test, from SERVERS[] or cmdline
- print "Testing server %s: %s" % (server_class, ' '.join(server_args))
- serverproc = subprocess.Popen(server_args)
- time.sleep(0.25)
- try:
- argv = [sys.executable, relfile("TestClient.py"),
- '--proto=%s' % (proto), '--port=%d' % (port) ]
- if server_class in FRAMED:
- argv.append('--framed')
- if server_class == 'THttpServer':
- argv.append('--http=/')
- print 'Testing client %s: %s' % (server_class, ' '.join(argv))
- ret = subprocess.call(argv)
- if ret != 0:
- raise Exception("subprocess %s failed, args: %s" % (server_class, ' '.join(argv)))
- finally:
- # check that server didn't die
- time.sleep(0.05)
- serverproc.poll()
- if serverproc.returncode is not None:
- print 'Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode)
- raise Exception('subprocess %s died, args: %s' % (server_class, ' '.join(server_args)))
- else:
- if server_class in EXTRA_DELAY:
- print 'Giving %s (proto=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class, proto, EXTRA_SLEEP)
- time.sleep(EXTRA_SLEEP)
- os.kill(serverproc.pid, signal.SIGKILL)
- # wait for shutdown
- time.sleep(0.5)
+def runTest(server_class, proto, port, use_zlib, use_ssl):
+ # Build command line arguments
+ server_args = [sys.executable, relfile('TestServer.py') ]
+ cli_args = [sys.executable, relfile('TestClient.py') ]
+ for which in (server_args, cli_args):
+ which.append('--proto=%s' % proto) # accel, binary or compact
+ which.append('--port=%d' % port) # default to 9090
+ if use_zlib:
+ which.append('--zlib')
+ if use_ssl:
+ which.append('--ssl')
+ if options.verbose == 0:
+ which.append('-q')
+ if options.verbose == 2:
+ which.append('-v')
+ # server-specific option to select server class
+ server_args.append(server_class)
+ # client-specific cmdline options
+ if server_class in FRAMED:
+ cli_args.append('--framed')
+ if server_class == 'THttpServer':
+ cli_args.append('--http=/')
+ if options.verbose > 0:
+ print 'Testing server %s: %s' % (server_class, ' '.join(server_args))
+ serverproc = subprocess.Popen(server_args)
+ time.sleep(0.2)
+ try:
+ if options.verbose > 0:
+ print 'Testing client: %s' % (' '.join(cli_args))
+ ret = subprocess.call(cli_args)
+ if ret != 0:
+ raise Exception("Client subprocess failed, retcode=%d, args: %s" % (ret, ' '.join(cli_args)))
+ finally:
+ # check that server didn't die
+ serverproc.poll()
+ if serverproc.returncode is not None:
+ print 'FAIL: Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode)
+ raise Exception('Server subprocess %s died, args: %s' % (server_class, ' '.join(server_args)))
+ else:
+ if server_class in EXTRA_DELAY:
+ if options.verbose > 0:
+ print 'Giving %s (proto=%s,zlib=%s,ssl=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class,
+ proto, use_zlib, use_ssl, EXTRA_SLEEP)
+ time.sleep(EXTRA_SLEEP)
+ os.kill(serverproc.pid, signal.SIGKILL)
+ # wait for shutdown
+ time.sleep(0.1)
+test_count = 0
for try_server in SERVERS:
for try_proto in PROTOS:
- runTest(try_server, try_proto, options.port)
+ for with_zlib in (False, True):
+ # skip any servers that don't work with the Zlib transport
+ if with_zlib and try_server in SKIP_ZLIB:
+ continue
+ for with_ssl in (False, True):
+ # skip any servers that don't work with SSL
+ if with_ssl and try_server in SKIP_SSL:
+ continue
+ test_count += 1
+ if options.verbose > 0:
+ print '\nTest run #%d: Server=%s, Proto=%s, zlib=%s, SSL=%s' % (test_count, try_server, try_proto, with_zlib, with_ssl)
+ runTest(try_server, try_proto, options.port, with_zlib, with_ssl)
+ if options.verbose > 0:
+ print 'OK: Finished %s / %s proto / zlib=%s / SSL=%s. %d combinations tested.' % (try_server, try_proto, with_zlib, with_ssl, test_count)
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index eecb850..6429ec3 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -28,6 +28,7 @@
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.transport import THttpClient
+from thrift.transport import TZlibTransport
from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
import unittest
@@ -40,6 +41,10 @@
help="connect to server at port")
parser.add_option("--host", type="string", dest="host",
help="connect to server")
+parser.add_option("--zlib", action="store_true", dest="zlib",
+ help="use zlib wrapper for compressed transport")
+parser.add_option("--ssl", action="store_true", dest="ssl",
+ help="use SSL for encrypted transport")
parser.add_option("--framed", action="store_true", dest="framed",
help="use framed transport")
parser.add_option("--http", dest="http_path",
@@ -58,19 +63,21 @@
class AbstractTest(unittest.TestCase):
def setUp(self):
if options.http_path:
- self.transport = THttpClient.THttpClient(
- options.host, options.port, options.http_path)
+ self.transport = THttpClient.THttpClient(options.host, port=options.port, path=options.http_path)
else:
- socket = TSocket.TSocket(options.host, options.port)
-
+ if options.ssl:
+ from thrift.transport import TSSLSocket
+ socket = TSSLSocket.TSSLSocket(options.host, options.port, validate=False)
+ else:
+ socket = TSocket.TSocket(options.host, options.port)
# frame or buffer depending upon args
if options.framed:
self.transport = TTransport.TFramedTransport(socket)
else:
self.transport = TTransport.TBufferedTransport(socket)
-
+ if options.zlib:
+ self.transport = TZlibTransport.TZlibTransport(self.transport, 9)
self.transport.open()
-
protocol = self.protocol_factory.getProtocol(self.transport)
self.client = ThriftTest.Client(protocol)
@@ -82,7 +89,7 @@
self.client.testVoid()
def testString(self):
- self.assertEqual(self.client.testString('Python'), 'Python')
+ self.assertEqual(self.client.testString('Python' * 20), 'Python' * 20)
self.assertEqual(self.client.testString(''), '')
def testByte(self):
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
index 99d925a..fa62765 100755
--- a/test/py/TestServer.py
+++ b/test/py/TestServer.py
@@ -28,64 +28,76 @@
from ThriftTest.ttypes import *
from thrift.transport import TTransport
from thrift.transport import TSocket
+from thrift.transport import TZlibTransport
from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
from thrift.server import TServer, TNonblockingServer, THttpServer
+PROT_FACTORIES = {'binary': TBinaryProtocol.TBinaryProtocolFactory,
+ 'accel': TBinaryProtocol.TBinaryProtocolAcceleratedFactory,
+ 'compact': TCompactProtocol.TCompactProtocolFactory}
+
parser = OptionParser()
-parser.set_defaults(port=9090, verbose=1, proto='binary')
parser.add_option("--port", type="int", dest="port",
help="port number for server to listen on")
+parser.add_option("--zlib", action="store_true", dest="zlib",
+ help="use zlib wrapper for compressed transport")
+parser.add_option("--ssl", action="store_true", dest="ssl",
+ help="use SSL for encrypted transport")
parser.add_option('-v', '--verbose', action="store_const",
dest="verbose", const=2,
help="verbose output")
+parser.add_option('-q', '--quiet', action="store_const",
+ dest="verbose", const=0,
+ help="minimal output")
parser.add_option('--proto', dest="proto", type="string",
help="protocol to use, one of: accel, binary, compact")
+parser.set_defaults(port=9090, verbose=1, proto='binary')
options, args = parser.parse_args()
class TestHandler:
def testVoid(self):
- if options.verbose:
+ if options.verbose > 1:
print 'testVoid()'
def testString(self, str):
- if options.verbose:
+ if options.verbose > 1:
print 'testString(%s)' % str
return str
def testByte(self, byte):
- if options.verbose:
+ if options.verbose > 1:
print 'testByte(%d)' % byte
return byte
def testI16(self, i16):
- if options.verbose:
+ if options.verbose > 1:
print 'testI16(%d)' % i16
return i16
def testI32(self, i32):
- if options.verbose:
+ if options.verbose > 1:
print 'testI32(%d)' % i32
return i32
def testI64(self, i64):
- if options.verbose:
+ if options.verbose > 1:
print 'testI64(%d)' % i64
return i64
def testDouble(self, dub):
- if options.verbose:
+ if options.verbose > 1:
print 'testDouble(%f)' % dub
return dub
def testStruct(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testStruct({%s, %d, %d, %d})' % (thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing)
return thing
def testException(self, str):
- if options.verbose:
+ if options.verbose > 1:
print 'testException(%s)' % str
if str == 'Xception':
x = Xception()
@@ -96,90 +108,111 @@
raise ValueError("foo")
def testOneway(self, seconds):
- if options.verbose:
+ if options.verbose > 1:
print 'testOneway(%d) => sleeping...' % seconds
time.sleep(seconds / 3) # be quick
- if options.verbose:
+ if options.verbose > 1:
print 'done sleeping'
def testNest(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testNest(%s)' % thing
return thing
def testMap(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testMap(%s)' % thing
return thing
def testSet(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testSet(%s)' % thing
return thing
def testList(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testList(%s)' % thing
return thing
def testEnum(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testEnum(%s)' % thing
return thing
def testTypedef(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testTypedef(%s)' % thing
return thing
def testMapMap(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testMapMap(%s)' % thing
return thing
def testMulti(self, arg0, arg1, arg2, arg3, arg4, arg5):
- if options.verbose:
+ if options.verbose > 1:
print 'testMulti(%s)' % [arg0, arg1, arg2, arg3, arg4, arg5]
x = Xtruct(byte_thing=arg0, i32_thing=arg1, i64_thing=arg2)
return x
-if options.proto == 'binary':
- pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-elif options.proto == 'accel':
- pfactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
-elif options.proto == 'compact':
- pfactory = TCompactProtocol.TCompactProtocolFactory()
-else:
+# set up the protocol factory form the --proto option
+pfactory_cls = PROT_FACTORIES.get(options.proto, None)
+if pfactory_cls is None:
raise AssertionError('Unknown --proto option: %s' % options.proto)
+pfactory = pfactory_cls()
+
+# get the server type (TSimpleServer, TNonblockingServer, etc...)
+if len(args) > 1:
+ raise AssertionError('Only one server type may be specified, not multiple types.')
+server_type = args[0]
+
+# Set up the handler and processor objects
handler = TestHandler()
processor = ThriftTest.Processor(handler)
-if args[0] == "THttpServer":
- server = THttpServer.THttpServer(processor, ('', options.port), pfactory)
+# Handle THttpServer as a special case
+if server_type == 'THttpServer':
+ server =THttpServer.THttpServer(processor, ('', options.port), pfactory)
+ server.serve()
+ sys.exit(0)
+
+# set up server transport and transport factory
+host = None
+if options.ssl:
+ from thrift.transport import TSSLSocket
+ transport = TSSLSocket.TSSLServerSocket(host, options.port, certfile='test_cert.pem')
else:
- host = None
transport = TSocket.TServerSocket(host, options.port)
- tfactory = TTransport.TBufferedTransportFactory()
+tfactory = TTransport.TBufferedTransportFactory()
- if args[0] == "TNonblockingServer":
- server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
- elif args[0] == "TProcessPoolServer":
- import signal
- def set_alarm():
- def clean_shutdown(signum, frame):
- for worker in server.workers:
+# if --zlib, then wrap server transport, and use a different transport factory
+if options.zlib:
+ transport = TZlibTransport.TZlibTransport(transport) # wrap with zlib
+ tfactory = TZlibTransport.TZlibTransportFactory()
+
+# do server-specific setup here:
+if server_type == "TNonblockingServer":
+ server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
+elif server_type == "TProcessPoolServer":
+ import signal
+ from thrift.server import TProcessPoolServer
+ server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
+ server.setNumWorkers(5)
+ def set_alarm():
+ def clean_shutdown(signum, frame):
+ for worker in server.workers:
+ if options.verbose > 0:
print 'Terminating worker: %s' % worker
- worker.terminate()
+ worker.terminate()
+ if options.verbose > 0:
print 'Requesting server to stop()'
- server.stop()
- signal.signal(signal.SIGALRM, clean_shutdown)
- signal.alarm(2)
- from thrift.server import TProcessPoolServer
- server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
- server.setNumWorkers(5)
- set_alarm()
- else:
- ServerClass = getattr(TServer, args[0])
- server = ServerClass(processor, transport, tfactory, pfactory)
-
+ server.stop()
+ signal.signal(signal.SIGALRM, clean_shutdown)
+ signal.alarm(2)
+ set_alarm()
+else:
+ # look up server class dynamically to instantiate server
+ ServerClass = getattr(TServer, server_type)
+ server = ServerClass(processor, transport, tfactory, pfactory)
+# enter server main loop
server.serve()
diff --git a/test/py/test_cert.pem b/test/py/test_cert.pem
new file mode 100644
index 0000000..9b1a51f
--- /dev/null
+++ b/test/py/test_cert.pem
@@ -0,0 +1,28 @@
+-----BEGIN CERTIFICATE-----
+MIIB+zCCAWQCCQDyq++o7K0rpTANBgkqhkiG9w0BAQUFADBCMQswCQYDVQQGEwJV
+UzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBh
+bnkgTHRkMB4XDTExMDMxNjEzMTQ1NVoXDTIxMDMxMzEzMTQ1NVowQjELMAkGA1UE
+BhMCVVMxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoGA1UECgwTRGVmYXVsdCBD
+b21wYW55IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA9lmCuVQRqRBR
+OYVH+FMChSoF8IjMwfrpnC65J9RR88dUIZbjC2b+JPT5qiUVQft2NzPPwiBnXI2s
+j6AmHYVKoWGB24hNX8bj2cjtxdPpT2rvfAlIK0pat1C+kCxgRHIg++S7o6GEJOkw
+OQiopsUroAsIbSRT/Ird/A0+KeSqQ0sCAwEAATANBgkqhkiG9w0BAQUFAAOBgQDf
+WseEh6/3gWl/G44MyjljBvgRAa0c+eqFL/cVl7Zfh03/KOXMlPV5/snVUYBOJCCI
+qPuQwWToT+Q36kNQyMnG4e4gh+DmsiIhgQgA3lVSNDhPPfRrG1vDeeXXtybpEoke
+fI6o9a9olzrKWNvW+/8P9xIDlP0SRZxL66464LAQnw==
+-----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+MIICXwIBAAKBgQD2WYK5VBGpEFE5hUf4UwKFKgXwiMzB+umcLrkn1FHzx1QhluML
+Zv4k9PmqJRVB+3Y3M8/CIGdcjayPoCYdhUqhYYHbiE1fxuPZyO3F0+lPau98CUgr
+Slq3UL6QLGBEciD75LujoYQk6TA5CKimxSugCwhtJFP8it38DT4p5KpDSwIDAQAB
+AoGBAMcnA7Q5T3GifFeI9O6+hLoMh/K1VPq4kmStrQeS8JGoIc5pwbC1GV3dIXy4
+L+BAnofv/dQNCCJdchRGPqn82J/aOA/sMsJJ6VzTSr9NNVl9lgQHdLjEDoZ15yxT
+vVSc4nG2xBs7uZ/24fN/SJZVFO3+EdphOvrp7uEXLiXlqvopAkEA/h7XGlrULBIN
+ekjAzEJEchlZb4xJdPrH3P4LZs92ZlcO88GFr5wfOz1ytafLiZA9EzYwLIQTPdsk
+HHynJeZWtwJBAPgr9PYUJOdkhUeWVSN2PyqvWKrdQVKvM1VwNgRFaSPXgBd0WGIN
+Eym1b7wt6ngwNtfLx9FUOR6nl7MklsFLBA0CQQDnSiibqynLxs6PiyI3huUHOH1H
+YtcE6q/4Ox0jgRYRhZFtWKkVsbJXV9FM9yDw3uBH2R01lyxwM0GF0ArOGvy3AkEA
+7eEcjh/i+9Wzl1n3Q+WdSKoJAMbSTZJYT0Ye0NtDm7J+On0wFtRXkPw0HRmaDRiS
+CSlw4CquEb8tPu8Mfj0MpQJBAKwTLSdHsy0vxQQJXm0lTI+Ck9KJUM9vJzFuCL/x
+G6fmsqEttxjhyLnze+iIIRAu/IV+A5UrWnI1h728y/wRejw=
+-----END RSA PRIVATE KEY-----
diff --git a/test/py/test_cert.readme b/test/py/test_cert.readme
new file mode 100644
index 0000000..08bbbc9
--- /dev/null
+++ b/test/py/test_cert.readme
@@ -0,0 +1,7 @@
+NOTE:
+The test_cert.pem file in this directory must never be used on production systems.
+
+The key it represents is a self-signed key intended only for use by the unit
+testing framework. It is a publicly exposed private key.
+
+Do not use test_cert.pem in production environments under any circumstances.