THRIFT-1914 Python: Support for Multiplexing Services on any
 Transport, Protocol and Server

Patch: smallfish & djwatson
 & haijunz & Roger Meier

This closes #103 and #82

From 7aaea7ef4e6f44097b02543fa2e62597eae9d61e Mon Sep 17 00:00:00 2001
From: smallfish <smallfish.xy@gmail.com>
Date: Tue, 22 Apr 2014 11:26:52 +0800
Subject: [PATCH]  THRIFT-1914 Python: Support for Multiplexing Services on any
 Transport
diff --git a/lib/py/src/TMultiplexedProcessor.py b/lib/py/src/TMultiplexedProcessor.py
new file mode 100644
index 0000000..a8d5565
--- /dev/null
+++ b/lib/py/src/TMultiplexedProcessor.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from thrift.Thrift import TProcessor, TMessageType, TException
+from thrift.protocol import TProtocolDecorator, TMultiplexedProtocol
+
+class TMultiplexedProcessor(TProcessor):
+  def __init__(self):
+    self.services = {}
+
+  def registerProcessor(self, serviceName, processor):
+    self.services[serviceName] = processor
+
+  def process(self, iprot, oprot):
+    (name, type, seqid) = iprot.readMessageBegin();
+    if type != TMessageType.CALL & type != TMessageType.ONEWAY:
+      raise TException("TMultiplex protocol only supports CALL & ONEWAY")
+
+    index = name.find(TMultiplexedProtocol.SEPARATOR)
+    if index < 0:
+      raise TException("Service name not found in message name: " + name + ". Did you forget to use TMultiplexProtocol in your client?")
+
+    serviceName = name[0:index]
+    call = name[index+len(TMultiplexedProtocol.SEPARATOR):]
+    if not serviceName in self.services:
+      raise TException("Service name not found: " + serviceName + ". Did you forget to call registerProcessor()?")
+
+    standardMessage = (
+      call,
+      type,
+      seqid
+    )
+    return self.services[serviceName].process(StoredMessageProtocol(iprot, standardMessage), oprot)
+
+
+class StoredMessageProtocol(TProtocolDecorator.TProtocolDecorator):
+  def __init__(self, protocol, messageBegin):
+    TProtocolDecorator.TProtocolDecorator.__init__(self, protocol)
+    self.messageBegin = messageBegin
+
+  def readMessageBegin(self):
+    return self.messageBegin
diff --git a/lib/py/src/protocol/TMultiplexedProtocol.py b/lib/py/src/protocol/TMultiplexedProtocol.py
new file mode 100644
index 0000000..d25f367
--- /dev/null
+++ b/lib/py/src/protocol/TMultiplexedProtocol.py
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from thrift.Thrift import TMessageType
+from thrift.protocol import TProtocolDecorator
+
+SEPARATOR = ":"
+
+class TMultiplexedProtocol(TProtocolDecorator.TProtocolDecorator):
+  def __init__(self, protocol, serviceName):
+    TProtocolDecorator.TProtocolDecorator.__init__(self, protocol)
+    self.serviceName = serviceName
+
+  def writeMessageBegin(self, name, type, seqid):
+    if (type == TMessageType.CALL or
+        type == TMessageType.ONEWAY):
+      self.protocol.writeMessageBegin(
+        self.serviceName + SEPARATOR + name,
+        type,
+        seqid
+      )
+    else:
+      self.protocol.writeMessageBegin(name, type, seqid)
diff --git a/lib/py/src/protocol/TProtocolDecorator.py b/lib/py/src/protocol/TProtocolDecorator.py
new file mode 100644
index 0000000..3e9e500
--- /dev/null
+++ b/lib/py/src/protocol/TProtocolDecorator.py
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from thrift.protocol.TProtocol import TProtocolBase
+from types import *
+
+class TProtocolDecorator():
+  def __init__(self, protocol):
+    TProtocolBase(protocol)
+    self.protocol = protocol
+
+  def __getattr__(self, name):
+    if hasattr(self.protocol, name):
+      member = getattr(self.protocol, name)
+      if type(member) in [MethodType, UnboundMethodType, FunctionType, LambdaType, BuiltinFunctionType, BuiltinMethodType]:
+        return lambda *args, **kwargs: self._wrap(member, args, kwargs)
+      else:
+        return member
+    raise AttributeError(name)
+
+  def _wrap(self, func, args, kwargs):
+    if type(func) == MethodType:
+      result = func(*args, **kwargs)
+    else:
+      result = func(self.protocol, *args, **kwargs)
+    return result
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index 471e030..18aea86 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -38,6 +38,8 @@
     help="use zlib wrapper for compressed transport")
 parser.add_option("--ssl", action="store_true", dest="ssl",
     help="use SSL for encrypted transport")
+parser.add_option("--multiple", action="store_true", dest="multiple",
+    help="use Multiple service")
 parser.add_option("--framed", action="store_true", dest="framed",
     help="use framed transport")
 parser.add_option("--http", dest="http_path",
@@ -55,8 +57,9 @@
 
 sys.path.insert(0, options.genpydir)
 
-from ThriftTest import ThriftTest
+from ThriftTest import ThriftTest, SecondService
 from ThriftTest.ttypes import *
+from thrift.protocol import TMultiplexedProtocol
 from thrift.transport import TTransport
 from thrift.transport import TSocket
 from thrift.transport import THttpClient
@@ -84,7 +87,14 @@
         self.transport = TZlibTransport.TZlibTransport(self.transport, 9)
     self.transport.open()
     protocol = self.protocol_factory.getProtocol(self.transport)
-    self.client = ThriftTest.Client(protocol)
+    if options.multiple:
+        p = TMultiplexedProtocol.TMultiplexedProtocol(protocol, "ThriftTest")
+        self.client = ThriftTest.Client(p)
+        p = TMultiplexedProtocol.TMultiplexedProtocol(protocol, "SecondService")
+        self.client2 = SecondService.Client(p)
+    else:
+        self.client = ThriftTest.Client(protocol)
+        self.client2 = None
 
   def tearDown(self):
     # Close!
@@ -203,6 +213,10 @@
     self.client.testOneway(1) # type is int, not float
     self.assertEqual(self.client.testString('Python'), 'Python')
 
+  def testblahBlah(self):
+    if self.client2:
+       self.assertEqual(self.client2.blahBlah(), None)
+
 class NormalBinaryTest(AbstractTest):
   protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()
 
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
index 28241cc..8022341 100755
--- a/test/py/TestServer.py
+++ b/test/py/TestServer.py
@@ -33,6 +33,8 @@
     help="use zlib wrapper for compressed transport")
 parser.add_option("--ssl", action="store_true", dest="ssl",
     help="use SSL for encrypted transport")
+parser.add_option("--multiple", action="store_true", dest="multiple",
+    help="use multiple service")
 parser.add_option('-v', '--verbose', action="store_const", 
     dest="verbose", const=2,
     help="verbose output")
@@ -46,9 +48,10 @@
 
 sys.path.insert(0, options.genpydir)
 
-from ThriftTest import ThriftTest
+from ThriftTest import ThriftTest, SecondService
 from ThriftTest.ttypes import *
 from thrift.Thrift import TException
+from thrift import TMultiplexedProcessor
 from thrift.transport import TTransport
 from thrift.transport import TSocket
 from thrift.transport import TZlibTransport
@@ -62,6 +65,12 @@
     'compact': TCompactProtocol.TCompactProtocolFactory,
     'json': TJSONProtocol.TJSONProtocolFactory}
 
+class SecondHandler:
+
+  def blahBlah(self):
+    if options.verbose > 1:
+      print 'blahBlah()'
+
 class TestHandler:
 
   def testVoid(self):
@@ -188,8 +197,15 @@
 server_type = args[0]
 
 # Set up the handler and processor objects
-handler = TestHandler()
-processor = ThriftTest.Processor(handler)
+if not options.multiple:
+    handler   = TestHandler()
+    processor = ThriftTest.Processor(handler)
+else:
+    processor = TMultiplexedProcessor.TMultiplexedProcessor()
+    handler   = TestHandler()
+    processor.registerProcessor("ThriftTest", ThriftTest.Processor(handler))
+    handler   = SecondHandler()
+    processor.registerProcessor("SecondService", SecondService.Processor(handler))
 
 # Handle THttpServer as a special case
 if server_type == 'THttpServer':