THRIFT-1103. py: TZlibTransport for python, a zlib compressed transport

This patch adds a new TZlibTransport to the Python library and extends the test suite to exercise it.

Patch: Will Pierce

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1084276 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/transport/TSSLSocket.py b/lib/py/src/transport/TSSLSocket.py
index 5eff5e6..be35844 100644
--- a/lib/py/src/transport/TSSLSocket.py
+++ b/lib/py/src/transport/TSSLSocket.py
@@ -36,7 +36,7 @@
   """
   SSL_VERSION = ssl.PROTOCOL_TLSv1
 
-  def __init__(self, validate=True, ca_certs=None, *args, **kwargs):
+  def __init__(self, host='localhost', port=9090, validate=True, ca_certs=None, unix_socket=None):
     """
     @param validate: Set to False to disable SSL certificate validation entirely.
     @type validate: bool
@@ -56,10 +56,10 @@
     else:
       self.cert_reqs = ssl.CERT_REQUIRED
     self.ca_certs = ca_certs
-    if validate and ca_certs is not None:
-      if not os.access(ca_certs, os.R_OK):
+    if validate:
+      if ca_certs is None or not os.access(ca_certs, os.R_OK):
         raise IOError('Certificate Authority ca_certs file "%s" is not readable, cannot validate SSL certificates.' % (ca_certs))
-    TSocket.TSocket.__init__(self, *args, **kwargs)
+    TSocket.TSocket.__init__(self, host, port, unix_socket)
 
   def open(self):
     try:
@@ -131,7 +131,7 @@
   """
   SSL_VERSION = ssl.PROTOCOL_TLSv1
 
-  def __init__(self, certfile='cert.pem', *args, **kwargs):
+  def __init__(self, host=None, port=9090, certfile='cert.pem', unix_socket=None):
     """Initialize a TSSLServerSocket
     
     @param certfile: The filename of the server certificate file, defaults to cert.pem
@@ -143,7 +143,7 @@
     @type port: int
     """
     self.setCertfile(certfile)
-    TSocket.TServerSocket.__init__(self, *args, **kwargs)
+    TSocket.TServerSocket.__init__(self, host, port)
 
   def setCertfile(self, certfile):
     """Set or change the server certificate file used to wrap new connections.
@@ -159,8 +159,18 @@
 
   def accept(self):
     plain_client, addr = self.handle.accept()
+    try:
+      client = ssl.wrap_socket(plain_client, certfile=self.certfile,
+                      server_side=True, ssl_version=self.SSL_VERSION)
+    except ssl.SSLError, ssl_exc:
+      # failed handshake/ssl wrap, close socket to client
+      plain_client.close()
+      # raise ssl_exc
+      # We can't raise the exception, because it kills most TServer derived serve()
+      # methods.
+      # Instead, return None, and let the TServer instance deal with it in
+      # other exception handling.  (but TSimpleServer dies anyway)
+      return None 
     result = TSocket.TSocket()
-    client = ssl.wrap_socket(plain_client, certfile=self.certfile,
-                    server_side=True, ssl_version=self.SSL_VERSION)
     result.setHandle(client)
     return result
diff --git a/lib/py/src/transport/TZlibTransport.py b/lib/py/src/transport/TZlibTransport.py
new file mode 100644
index 0000000..784d4e1
--- /dev/null
+++ b/lib/py/src/transport/TZlibTransport.py
@@ -0,0 +1,261 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+'''
+TZlibTransport provides a compressed transport and transport factory
+class, using the python standard library zlib module to implement
+data compression.
+'''
+
+from __future__ import division
+import zlib
+from cStringIO import StringIO
+from TTransport import TTransportBase, CReadableTransport
+
+class TZlibTransportFactory(object):
+  '''
+  Factory transport that builds zlib compressed transports.
+  
+  This factory caches the last single client/transport that it was passed
+  and returns the same TZlibTransport object that was created.
+  
+  This caching means the TServer class will get the _same_ transport
+  object for both input and output transports from this factory.
+  (For non-threaded scenarios only, since the cache only holds one object)
+  
+  The purpose of this caching is to allocate only one TZlibTransport where
+  only one is really needed (since it must have separate read/write buffers),
+  and makes the statistics from getCompSavings() and getCompRatio()
+  easier to understand.
+  '''
+
+  # class scoped cache of last transport given and zlibtransport returned
+  _last_trans = None
+  _last_z = None
+
+  def getTransport(self, trans, compresslevel=9):
+    '''Wrap a transport , trans, with the TZlibTransport
+    compressed transport class, returning a new
+    transport to the caller.
+    
+    @param compresslevel: The zlib compression level, ranging
+    from 0 (no compression) to 9 (best compression).  Defaults to 9.
+    @type compresslevel: int
+    
+    This method returns a TZlibTransport which wraps the
+    passed C{trans} TTransport derived instance.
+    '''
+    if trans == self._last_trans:
+      return self._last_z
+    ztrans = TZlibTransport(trans, compresslevel)
+    self._last_trans = trans
+    self._last_z = ztrans
+    return ztrans
+
+
+class TZlibTransport(TTransportBase, CReadableTransport):
+  '''
+  Class that wraps a transport with zlib, compressing writes
+  and decompresses reads, using the python standard
+  library zlib module.
+  '''
+
+  # Read buffer size for the python fastbinary C extension,
+  # the TBinaryProtocolAccelerated class.
+  DEFAULT_BUFFSIZE = 4096
+
+  def __init__(self, trans, compresslevel=9):
+    '''
+    Create a new TZlibTransport, wrapping C{trans}, another
+    TTransport derived object.
+    
+    @param trans: A thrift transport object, i.e. a TSocket() object.
+    @type trans: TTransport
+    @param compresslevel: The zlib compression level, ranging
+    from 0 (no compression) to 9 (best compression).  Default is 9.
+    @type compresslevel: int
+    '''
+    self.__trans = trans
+    self.compresslevel = compresslevel
+    self.__rbuf = StringIO()
+    self.__wbuf = StringIO()
+    self._init_zlib()
+    self._init_stats()
+
+  def _reinit_buffers(self):
+    '''
+    Internal method to initialize/reset the internal StringIO objects
+    for read and write buffers.
+    '''
+    self.__rbuf = StringIO()
+    self.__wbuf = StringIO()
+
+  def _init_stats(self):
+    '''
+    Internal method to reset the internal statistics counters
+    for compression ratios and bandwidth savings.
+    '''
+    self.bytes_in = 0
+    self.bytes_out = 0
+    self.bytes_in_comp = 0
+    self.bytes_out_comp = 0
+
+  def _init_zlib(self):
+    '''
+    Internal method for setting up the zlib compression and
+    decompression objects.
+    '''
+    self._zcomp_read = zlib.decompressobj()
+    self._zcomp_write = zlib.compressobj(self.compresslevel)
+
+  def getCompRatio(self):
+    '''
+    Get the current measured compression ratios (in,out) from
+    this transport.
+    
+    Returns a tuple of: 
+    (inbound_compression_ratio, outbound_compression_ratio)
+    
+    The compression ratios are computed as:
+        compressed / uncompressed
+
+    E.g., data that compresses by 10x will have a ratio of: 0.10
+    and data that compresses to half of ts original size will
+    have a ratio of 0.5
+    
+    None is returned if no bytes have yet been processed in
+    a particular direction.
+    '''
+    r_percent, w_percent = (None, None)
+    if self.bytes_in > 0:
+      r_percent = self.bytes_in_comp / self.bytes_in
+    if self.bytes_out > 0:
+      w_percent = self.bytes_out_comp / self.bytes_out
+    return (r_percent, w_percent)
+
+  def getCompSavings(self):
+    '''
+    Get the current count of saved bytes due to data
+    compression.
+    
+    Returns a tuple of:
+    (inbound_saved_bytes, outbound_saved_bytes)
+    
+    Note: if compression is actually expanding your
+    data (only likely with very tiny thrift objects), then
+    the values returned will be negative.
+    '''
+    r_saved = self.bytes_in - self.bytes_in_comp
+    w_saved = self.bytes_out - self.bytes_out_comp
+    return (r_saved, w_saved)
+
+  def isOpen(self):
+    '''Return the underlying transport's open status'''
+    return self.__trans.isOpen()
+
+  def open(self):
+    """Open the underlying transport"""
+    self._init_stats()
+    return self.__trans.open()
+
+  def listen(self):
+    '''Invoke the underlying transport's listen() method'''
+    self.__trans.listen()
+
+  def accept(self):
+    '''Accept connections on the underlying transport'''
+    return self.__trans.accept()
+
+  def close(self):
+    '''Close the underlying transport,'''
+    self._reinit_buffers()
+    self._init_zlib()
+    return self.__trans.close()
+
+  def read(self, sz):
+    '''
+    Read up to sz bytes from the decompressed bytes buffer, and
+    read from the underlying transport if the decompression
+    buffer is empty.
+    '''
+    ret = self.__rbuf.read(sz)
+    if len(ret) > 0:
+      return ret
+    # keep reading from transport until something comes back
+    while True:
+      if self.readComp(sz):
+        break
+    ret = self.__rbuf.read(sz)
+    return ret
+
+  def readComp(self, sz):
+    '''
+    Read compressed data from the underlying transport, then
+    decompress it and append it to the internal StringIO read buffer
+    '''
+    zbuf = self.__trans.read(sz)
+    zbuf = self._zcomp_read.unconsumed_tail + zbuf
+    buf = self._zcomp_read.decompress(zbuf)
+    self.bytes_in += len(zbuf)
+    self.bytes_in_comp += len(buf)
+    old = self.__rbuf.read()
+    self.__rbuf = StringIO(old + buf)
+    if len(old) + len(buf) == 0:
+      return False
+    return True
+
+  def write(self, buf):
+    '''
+    Write some bytes, putting them into the internal write
+    buffer for eventual compression.
+    '''
+    self.__wbuf.write(buf)
+
+  def flush(self):
+    '''
+    Flush any queued up data in the write buffer and ensure the
+    compression buffer is flushed out to the underlying transport
+    '''
+    wout = self.__wbuf.getvalue()
+    if len(wout) > 0:
+      zbuf = self._zcomp_write.compress(wout)
+      self.bytes_out += len(wout)
+      self.bytes_out_comp += len(zbuf)
+    else:
+      zbuf = ''
+    ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH)
+    self.bytes_out_comp += len(ztail)
+    if (len(zbuf) + len(ztail)) > 0:
+      self.__wbuf = StringIO()
+      self.__trans.write(zbuf + ztail)
+    self.__trans.flush()
+
+  @property
+  def cstringio_buf(self):
+    '''Implement the CReadableTransport interface'''
+    return self.__rbuf
+
+  def cstringio_refill(self, partialread, reqlen):
+    '''Implement the CReadableTransport interface for refill'''
+    retstring = partialread
+    if reqlen < self.DEFAULT_BUFFSIZE:
+      retstring += self.read(self.DEFAULT_BUFFSIZE)
+    while len(retstring) < reqlen:
+      retstring += self.read(reqlen - len(retstring))
+    self.__rbuf = StringIO(retstring)
+    return self.__rbuf
diff --git a/lib/py/src/transport/__init__.py b/lib/py/src/transport/__init__.py
index 02c6048..46e54fe 100644
--- a/lib/py/src/transport/__init__.py
+++ b/lib/py/src/transport/__init__.py
@@ -17,4 +17,4 @@
 # under the License.
 #
 
-__all__ = ['TTransport', 'TSocket', 'THttpClient']
+__all__ = ['TTransport', 'TSocket', 'THttpClient','TZlibTransport']
diff --git a/test/py/RunClientServer.py b/test/py/RunClientServer.py
index dced91a..633856f 100755
--- a/test/py/RunClientServer.py
+++ b/test/py/RunClientServer.py
@@ -30,9 +30,18 @@
 parser = OptionParser()
 parser.add_option("--port", type="int", dest="port", default=9090,
     help="port number for server to listen on")
+parser.add_option('-v', '--verbose', action="store_const", 
+    dest="verbose", const=2,
+    help="verbose output")
+parser.add_option('-q', '--quiet', action="store_const", 
+    dest="verbose", const=0,
+    help="minimal output")
+parser.set_defaults(verbose=1)
 options, args = parser.parse_args()
 
 FRAMED = ["TNonblockingServer"]
+SKIP_ZLIB = ['TNonblockingServer', 'THttpServer']
+SKIP_SSL = ['TNonblockingServer', 'THttpServer']
 EXTRA_DELAY = ['TProcessPoolServer']
 EXTRA_SLEEP = 3.5
 
@@ -58,6 +67,11 @@
   print 'Warning: the multiprocessing module is unavailable. Skipping tests for TProcessPoolServer'
   SERVERS.remove('TProcessPoolServer')
 
+try:
+  import ssl
+except:
+  print 'Warning, no ssl module available. Skipping all SSL tests.'
+  SKIP_SSL.extend(SERVERS)
 
 # commandline permits a single class name to be specified to override SERVERS=[...]
 if len(args) == 1:
@@ -71,41 +85,68 @@
 def relfile(fname):
     return os.path.join(os.path.dirname(__file__), fname)
 
-def runTest(server_class, proto, port):
-    server_args = [sys.executable, # /usr/bin/python or similar
-      relfile('TestServer.py'), # ./TestServer.py
-      '--proto=%s' % proto, # accel, binary or compact
-      '--port=%d' % port, # usually 9090, given on cmdline
-      server_class] # name of class to test, from SERVERS[] or cmdline
-    print "Testing server %s: %s" % (server_class, ' '.join(server_args))
-    serverproc = subprocess.Popen(server_args)
-    time.sleep(0.25)
-    try:
-        argv = [sys.executable, relfile("TestClient.py"),
-           '--proto=%s' % (proto), '--port=%d' % (port) ]
-        if server_class in FRAMED:
-            argv.append('--framed')
-        if server_class == 'THttpServer':
-            argv.append('--http=/')
-        print 'Testing client %s: %s' % (server_class, ' '.join(argv))
-        ret = subprocess.call(argv)
-        if ret != 0:
-            raise Exception("subprocess %s failed, args: %s" % (server_class, ' '.join(argv)))
-    finally:
-        # check that server didn't die
-        time.sleep(0.05)
-        serverproc.poll()
-        if serverproc.returncode is not None:
-          print 'Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode)
-          raise Exception('subprocess %s died, args: %s' % (server_class, ' '.join(server_args)))
-        else:
-          if server_class in EXTRA_DELAY:
-            print 'Giving %s (proto=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class, proto, EXTRA_SLEEP)
-            time.sleep(EXTRA_SLEEP)
-          os.kill(serverproc.pid, signal.SIGKILL)
-    # wait for shutdown
-    time.sleep(0.5)
+def runTest(server_class, proto, port, use_zlib, use_ssl):
+  # Build command line arguments
+  server_args = [sys.executable, relfile('TestServer.py') ]
+  cli_args = [sys.executable, relfile('TestClient.py') ]
+  for which in (server_args, cli_args):
+    which.append('--proto=%s' % proto) # accel, binary or compact
+    which.append('--port=%d' % port) # default to 9090
+    if use_zlib:
+      which.append('--zlib')
+    if use_ssl:
+      which.append('--ssl')
+    if options.verbose == 0:
+      which.append('-q')
+    if options.verbose == 2:
+      which.append('-v')
+  # server-specific option to select server class
+  server_args.append(server_class)
+  # client-specific cmdline options
+  if server_class in FRAMED:
+    cli_args.append('--framed')
+  if server_class == 'THttpServer':
+    cli_args.append('--http=/')
+  if options.verbose > 0:
+    print 'Testing server %s: %s' % (server_class, ' '.join(server_args))
+  serverproc = subprocess.Popen(server_args)
+  time.sleep(0.2)
+  try:
+    if options.verbose > 0:
+      print 'Testing client: %s' % (' '.join(cli_args))
+    ret = subprocess.call(cli_args)
+    if ret != 0:
+      raise Exception("Client subprocess failed, retcode=%d, args: %s" % (ret, ' '.join(cli_args)))
+  finally:
+    # check that server didn't die
+    serverproc.poll()
+    if serverproc.returncode is not None:
+      print 'FAIL: Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode)
+      raise Exception('Server subprocess %s died, args: %s' % (server_class, ' '.join(server_args)))
+    else:
+      if server_class in EXTRA_DELAY:
+        if options.verbose > 0:
+          print 'Giving %s (proto=%s,zlib=%s,ssl=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class,
+                proto, use_zlib, use_ssl, EXTRA_SLEEP)
+        time.sleep(EXTRA_SLEEP)
+      os.kill(serverproc.pid, signal.SIGKILL)
+  # wait for shutdown
+  time.sleep(0.1)
 
+test_count = 0
 for try_server in SERVERS:
   for try_proto in PROTOS:
-    runTest(try_server, try_proto, options.port)
+    for with_zlib in (False, True):
+      # skip any servers that don't work with the Zlib transport
+      if with_zlib and try_server in SKIP_ZLIB:
+        continue
+      for with_ssl in (False, True):
+        # skip any servers that don't work with SSL
+        if with_ssl and try_server in SKIP_SSL:
+          continue
+        test_count += 1
+        if options.verbose > 0:
+          print '\nTest run #%d:  Server=%s,  Proto=%s,  zlib=%s,  SSL=%s' % (test_count, try_server, try_proto, with_zlib, with_ssl)
+        runTest(try_server, try_proto, options.port, with_zlib, with_ssl)
+        if options.verbose > 0:
+          print 'OK: Finished  %s / %s proto / zlib=%s / SSL=%s.   %d combinations tested.' % (try_server, try_proto, with_zlib, with_ssl, test_count)
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index eecb850..6429ec3 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -28,6 +28,7 @@
 from thrift.transport import TTransport
 from thrift.transport import TSocket
 from thrift.transport import THttpClient
+from thrift.transport import TZlibTransport
 from thrift.protocol import TBinaryProtocol
 from thrift.protocol import TCompactProtocol
 import unittest
@@ -40,6 +41,10 @@
     help="connect to server at port")
 parser.add_option("--host", type="string", dest="host",
     help="connect to server")
+parser.add_option("--zlib", action="store_true", dest="zlib",
+    help="use zlib wrapper for compressed transport")
+parser.add_option("--ssl", action="store_true", dest="ssl",
+    help="use SSL for encrypted transport")
 parser.add_option("--framed", action="store_true", dest="framed",
     help="use framed transport")
 parser.add_option("--http", dest="http_path",
@@ -58,19 +63,21 @@
 class AbstractTest(unittest.TestCase):
   def setUp(self):
     if options.http_path:
-      self.transport = THttpClient.THttpClient(
-          options.host, options.port, options.http_path)
+      self.transport = THttpClient.THttpClient(options.host, port=options.port, path=options.http_path)
     else:
-      socket = TSocket.TSocket(options.host, options.port)
-
+      if options.ssl:
+        from thrift.transport import TSSLSocket
+        socket = TSSLSocket.TSSLSocket(options.host, options.port, validate=False)
+      else:
+        socket = TSocket.TSocket(options.host, options.port)
       # frame or buffer depending upon args
       if options.framed:
         self.transport = TTransport.TFramedTransport(socket)
       else:
         self.transport = TTransport.TBufferedTransport(socket)
-
+      if options.zlib:
+        self.transport = TZlibTransport.TZlibTransport(self.transport, 9)
     self.transport.open()
-
     protocol = self.protocol_factory.getProtocol(self.transport)
     self.client = ThriftTest.Client(protocol)
 
@@ -82,7 +89,7 @@
     self.client.testVoid()
 
   def testString(self):
-    self.assertEqual(self.client.testString('Python'), 'Python')
+    self.assertEqual(self.client.testString('Python' * 20), 'Python' * 20)
     self.assertEqual(self.client.testString(''), '')
 
   def testByte(self):
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
index 99d925a..fa62765 100755
--- a/test/py/TestServer.py
+++ b/test/py/TestServer.py
@@ -28,64 +28,76 @@
 from ThriftTest.ttypes import *
 from thrift.transport import TTransport
 from thrift.transport import TSocket
+from thrift.transport import TZlibTransport
 from thrift.protocol import TBinaryProtocol
 from thrift.protocol import TCompactProtocol
 from thrift.server import TServer, TNonblockingServer, THttpServer
 
+PROT_FACTORIES = {'binary': TBinaryProtocol.TBinaryProtocolFactory,
+    'accel': TBinaryProtocol.TBinaryProtocolAcceleratedFactory,
+    'compact': TCompactProtocol.TCompactProtocolFactory}
+
 parser = OptionParser()
-parser.set_defaults(port=9090, verbose=1, proto='binary')
 parser.add_option("--port", type="int", dest="port",
     help="port number for server to listen on")
+parser.add_option("--zlib", action="store_true", dest="zlib",
+    help="use zlib wrapper for compressed transport")
+parser.add_option("--ssl", action="store_true", dest="ssl",
+    help="use SSL for encrypted transport")
 parser.add_option('-v', '--verbose', action="store_const", 
     dest="verbose", const=2,
     help="verbose output")
+parser.add_option('-q', '--quiet', action="store_const", 
+    dest="verbose", const=0,
+    help="minimal output")
 parser.add_option('--proto',  dest="proto", type="string",
     help="protocol to use, one of: accel, binary, compact")
+parser.set_defaults(port=9090, verbose=1, proto='binary')
 options, args = parser.parse_args()
 
 class TestHandler:
 
   def testVoid(self):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testVoid()'
 
   def testString(self, str):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testString(%s)' % str
     return str
 
   def testByte(self, byte):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testByte(%d)' % byte
     return byte
 
   def testI16(self, i16):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testI16(%d)' % i16
     return i16
 
   def testI32(self, i32):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testI32(%d)' % i32
     return i32
 
   def testI64(self, i64):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testI64(%d)' % i64
     return i64
 
   def testDouble(self, dub):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testDouble(%f)' % dub
     return dub
 
   def testStruct(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testStruct({%s, %d, %d, %d})' % (thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing)
     return thing
 
   def testException(self, str):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testException(%s)' % str
     if str == 'Xception':
       x = Xception()
@@ -96,90 +108,111 @@
       raise ValueError("foo")
 
   def testOneway(self, seconds):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testOneway(%d) => sleeping...' % seconds
     time.sleep(seconds / 3) # be quick
-    if options.verbose:
+    if options.verbose > 1:
       print 'done sleeping'
 
   def testNest(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testNest(%s)' % thing
     return thing
 
   def testMap(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testMap(%s)' % thing
     return thing
 
   def testSet(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testSet(%s)' % thing
     return thing
 
   def testList(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testList(%s)' % thing
     return thing
 
   def testEnum(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testEnum(%s)' % thing
     return thing
 
   def testTypedef(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testTypedef(%s)' % thing
     return thing
 
   def testMapMap(self, thing):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testMapMap(%s)' % thing
     return thing
 
   def testMulti(self, arg0, arg1, arg2, arg3, arg4, arg5):
-    if options.verbose:
+    if options.verbose > 1:
       print 'testMulti(%s)' % [arg0, arg1, arg2, arg3, arg4, arg5]
     x = Xtruct(byte_thing=arg0, i32_thing=arg1, i64_thing=arg2)
     return x
 
-if options.proto == 'binary':
-  pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-elif options.proto == 'accel':
-  pfactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
-elif options.proto == 'compact':
-  pfactory = TCompactProtocol.TCompactProtocolFactory()
-else:
+# set up the protocol factory form the --proto option
+pfactory_cls = PROT_FACTORIES.get(options.proto, None)
+if pfactory_cls is None:
   raise AssertionError('Unknown --proto option: %s' % options.proto)
+pfactory = pfactory_cls()
+
+# get the server type (TSimpleServer, TNonblockingServer, etc...)
+if len(args) > 1:
+  raise AssertionError('Only one server type may be specified, not multiple types.')
+server_type = args[0]
+
+# Set up the handler and processor objects
 handler = TestHandler()
 processor = ThriftTest.Processor(handler)
 
-if args[0] == "THttpServer":
-  server = THttpServer.THttpServer(processor, ('', options.port), pfactory)
+# Handle THttpServer as a special case
+if server_type == 'THttpServer':
+  server =THttpServer.THttpServer(processor, ('', options.port), pfactory)
+  server.serve()
+  sys.exit(0)
+
+# set up server transport and transport factory
+host = None
+if options.ssl:
+  from thrift.transport import TSSLSocket
+  transport = TSSLSocket.TSSLServerSocket(host, options.port, certfile='test_cert.pem')
 else:
-  host = None
   transport = TSocket.TServerSocket(host, options.port)
-  tfactory = TTransport.TBufferedTransportFactory()
+tfactory = TTransport.TBufferedTransportFactory()
 
-  if args[0] == "TNonblockingServer":
-    server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
-  elif args[0] == "TProcessPoolServer":
-    import signal
-    def set_alarm():
-      def clean_shutdown(signum, frame):
-        for worker in server.workers:
+# if --zlib, then wrap server transport, and use a different transport factory
+if options.zlib:
+  transport = TZlibTransport.TZlibTransport(transport) # wrap  with zlib
+  tfactory = TZlibTransport.TZlibTransportFactory()
+
+# do server-specific setup here:
+if server_type == "TNonblockingServer":
+  server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
+elif server_type == "TProcessPoolServer":
+  import signal
+  from thrift.server import TProcessPoolServer
+  server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
+  server.setNumWorkers(5)
+  def set_alarm():
+    def clean_shutdown(signum, frame):
+      for worker in server.workers:
+        if options.verbose > 0:
           print 'Terminating worker: %s' % worker
-          worker.terminate()
+        worker.terminate()
+      if options.verbose > 0:
         print 'Requesting server to stop()'
-        server.stop()
-      signal.signal(signal.SIGALRM, clean_shutdown)
-      signal.alarm(2)
-    from thrift.server import TProcessPoolServer
-    server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
-    server.setNumWorkers(5)
-    set_alarm()
-  else:
-    ServerClass = getattr(TServer, args[0])
-    server = ServerClass(processor, transport, tfactory, pfactory)
-
+      server.stop()
+    signal.signal(signal.SIGALRM, clean_shutdown)
+    signal.alarm(2)
+  set_alarm()
+else:
+  # look up server class dynamically to instantiate server
+  ServerClass = getattr(TServer, server_type)
+  server = ServerClass(processor, transport, tfactory, pfactory)
+# enter server main loop
 server.serve()
diff --git a/test/py/test_cert.pem b/test/py/test_cert.pem
new file mode 100644
index 0000000..9b1a51f
--- /dev/null
+++ b/test/py/test_cert.pem
@@ -0,0 +1,28 @@
+-----BEGIN CERTIFICATE-----
+MIIB+zCCAWQCCQDyq++o7K0rpTANBgkqhkiG9w0BAQUFADBCMQswCQYDVQQGEwJV
+UzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBh
+bnkgTHRkMB4XDTExMDMxNjEzMTQ1NVoXDTIxMDMxMzEzMTQ1NVowQjELMAkGA1UE
+BhMCVVMxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoGA1UECgwTRGVmYXVsdCBD
+b21wYW55IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA9lmCuVQRqRBR
+OYVH+FMChSoF8IjMwfrpnC65J9RR88dUIZbjC2b+JPT5qiUVQft2NzPPwiBnXI2s
+j6AmHYVKoWGB24hNX8bj2cjtxdPpT2rvfAlIK0pat1C+kCxgRHIg++S7o6GEJOkw
+OQiopsUroAsIbSRT/Ird/A0+KeSqQ0sCAwEAATANBgkqhkiG9w0BAQUFAAOBgQDf
+WseEh6/3gWl/G44MyjljBvgRAa0c+eqFL/cVl7Zfh03/KOXMlPV5/snVUYBOJCCI
+qPuQwWToT+Q36kNQyMnG4e4gh+DmsiIhgQgA3lVSNDhPPfRrG1vDeeXXtybpEoke
+fI6o9a9olzrKWNvW+/8P9xIDlP0SRZxL66464LAQnw==
+-----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+MIICXwIBAAKBgQD2WYK5VBGpEFE5hUf4UwKFKgXwiMzB+umcLrkn1FHzx1QhluML
+Zv4k9PmqJRVB+3Y3M8/CIGdcjayPoCYdhUqhYYHbiE1fxuPZyO3F0+lPau98CUgr
+Slq3UL6QLGBEciD75LujoYQk6TA5CKimxSugCwhtJFP8it38DT4p5KpDSwIDAQAB
+AoGBAMcnA7Q5T3GifFeI9O6+hLoMh/K1VPq4kmStrQeS8JGoIc5pwbC1GV3dIXy4
+L+BAnofv/dQNCCJdchRGPqn82J/aOA/sMsJJ6VzTSr9NNVl9lgQHdLjEDoZ15yxT
+vVSc4nG2xBs7uZ/24fN/SJZVFO3+EdphOvrp7uEXLiXlqvopAkEA/h7XGlrULBIN
+ekjAzEJEchlZb4xJdPrH3P4LZs92ZlcO88GFr5wfOz1ytafLiZA9EzYwLIQTPdsk
+HHynJeZWtwJBAPgr9PYUJOdkhUeWVSN2PyqvWKrdQVKvM1VwNgRFaSPXgBd0WGIN
+Eym1b7wt6ngwNtfLx9FUOR6nl7MklsFLBA0CQQDnSiibqynLxs6PiyI3huUHOH1H
+YtcE6q/4Ox0jgRYRhZFtWKkVsbJXV9FM9yDw3uBH2R01lyxwM0GF0ArOGvy3AkEA
+7eEcjh/i+9Wzl1n3Q+WdSKoJAMbSTZJYT0Ye0NtDm7J+On0wFtRXkPw0HRmaDRiS
+CSlw4CquEb8tPu8Mfj0MpQJBAKwTLSdHsy0vxQQJXm0lTI+Ck9KJUM9vJzFuCL/x
+G6fmsqEttxjhyLnze+iIIRAu/IV+A5UrWnI1h728y/wRejw=
+-----END RSA PRIVATE KEY-----
diff --git a/test/py/test_cert.readme b/test/py/test_cert.readme
new file mode 100644
index 0000000..08bbbc9
--- /dev/null
+++ b/test/py/test_cert.readme
@@ -0,0 +1,7 @@
+NOTE:
+The test_cert.pem file in this directory must never be used on production systems.
+
+The key it represents is a self-signed key intended only for use by the unit
+testing framework.  It is a publicly exposed private key.
+
+Do not use test_cert.pem in production environments under any circumstances.