THRIFT-1901 C#: Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Jens Geyer
diff --git a/lib/csharp/src/Protocol/TMultiplexedProcessor.cs b/lib/csharp/src/Protocol/TMultiplexedProcessor.cs
new file mode 100644
index 0000000..29fac9e
--- /dev/null
+++ b/lib/csharp/src/Protocol/TMultiplexedProcessor.cs
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+using System;
+using System.Text;
+using Thrift.Transport;
+using System.Collections.Generic;
+
+namespace Thrift.Protocol 
+{
+
+    /**
+     * TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services.
+     * To do so, you instantiate the processor and then register additional processors with it, 
+     * as shown in the following example:
+     *
+     *     TMultiplexedProcessor processor = new TMultiplexedProcessor();
+     *
+     *     processor.registerProcessor(
+     *         "Calculator",
+     *         new Calculator.Processor(new CalculatorHandler()));
+     *
+     *     processor.registerProcessor(
+     *         "WeatherReport",
+     *         new WeatherReport.Processor(new WeatherReportHandler()));
+     *
+     *     TServerTransport t = new TServerSocket(9090);
+     *     TSimpleServer server = new TSimpleServer(processor, t);
+     *
+     *     server.serve();
+     */
+    public class TMultiplexedProcessor : TProcessor 
+    {
+        private Dictionary<String,TProcessor> ServiceProcessorMap = new Dictionary<String,TProcessor>();
+
+        /**
+         * 'Register' a service with this TMultiplexedProcessor. This allows us to broker 
+         * requests to individual services by using the service name to select them at request time.
+         *
+         * Args: 
+         * - serviceName    Name of a service, has to be identical to the name
+         *                  declared in the Thrift IDL, e.g. "WeatherReport".
+         * - processor      Implementation of a service, ususally referred to as "handlers", 
+         *                  e.g. WeatherReportHandler implementing WeatherReport.Iface.
+         */
+        public void RegisterProcessor(String serviceName, TProcessor processor) 
+        {
+            ServiceProcessorMap.Add(serviceName, processor);
+        }
+
+        
+        private void Fail( TProtocol oprot, TMessage message, TApplicationException.ExceptionType extype, string etxt)
+        {
+            TApplicationException appex = new TApplicationException( extype, etxt);
+
+            TMessage newMessage = new TMessage(message.Name, TMessageType.Exception, message.SeqID);
+
+            oprot.WriteMessageBegin(newMessage);
+            appex.Write( oprot);
+            oprot.WriteMessageEnd();
+            oprot.Transport.Flush();
+        }
+            
+        
+        /**
+         * This implementation of process performs the following steps:
+         *
+         * - Read the beginning of the message.
+         * - Extract the service name from the message.
+         * - Using the service name to locate the appropriate processor.
+         * - Dispatch to the processor, with a decorated instance of TProtocol
+         *    that allows readMessageBegin() to return the original TMessage.
+         *  
+         * Throws an exception if 
+         * - the message type is not CALL or ONEWAY, 
+         * - the service name was not found in the message, or 
+         * - the service name has not been RegisterProcessor()ed.  
+         */
+        public bool Process(TProtocol iprot, TProtocol oprot)
+        {
+            /*  Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+                message header.  This pulls the message "off the wire", which we'll
+                deal with at the end of this method. */
+
+            TMessage message = iprot.ReadMessageBegin();
+
+            if ((message.Type != TMessageType.Call) && (message.Type != TMessageType.Oneway)) 
+            {
+                Fail( oprot, message, 
+                      TApplicationException.ExceptionType.InvalidMessageType, 
+                      "Message type CALL or ONEWAY expected");
+                return false;
+            }
+
+            // Extract the service name
+            int index = message.Name.IndexOf(TMultiplexedProtocol.SEPARATOR);
+            if (index < 0) {
+                Fail( oprot, message, 
+                      TApplicationException.ExceptionType.InvalidProtocol,
+                      "Service name not found in message name: " + message.Name + ". "+
+                      "Did you forget to use a TMultiplexProtocol in your client?");
+                return false;
+            }
+
+            // Create a new TMessage, something that can be consumed by any TProtocol
+            string serviceName = message.Name.Substring(0, index);
+            TProcessor actualProcessor;
+            if( ! ServiceProcessorMap.TryGetValue(serviceName, out actualProcessor))
+            {
+                Fail( oprot, message, 
+                      TApplicationException.ExceptionType.InternalError,
+                      "Service name not found: " + serviceName + ". "+ 
+                      "Did you forget to call RegisterProcessor()?");
+                return false;
+            }
+
+            // Create a new TMessage, removing the service name
+            TMessage newMessage = new TMessage(
+                    message.Name.Substring(serviceName.Length + TMultiplexedProtocol.SEPARATOR.Length),
+                    message.Type,
+                    message.SeqID);
+
+            // Dispatch processing to the stored processor
+            return actualProcessor.Process(new StoredMessageProtocol(iprot, newMessage), oprot);
+        }
+
+        /**
+         *  Our goal was to work with any protocol.  In order to do that, we needed
+         *  to allow them to call readMessageBegin() and get a TMessage in exactly
+         *  the standard format, without the service name prepended to TMessage.name.
+         */
+        private class StoredMessageProtocol : TProtocolDecorator 
+        {
+            TMessage MsgBegin;
+
+            public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) 
+                :base(protocol)
+            {
+                this.MsgBegin = messageBegin;
+            }
+
+            public override TMessage ReadMessageBegin()  
+            {
+                return MsgBegin;
+            }
+        }
+
+    }
+}
diff --git a/lib/csharp/src/Protocol/TMultiplexedProtocol.cs b/lib/csharp/src/Protocol/TMultiplexedProtocol.cs
new file mode 100644
index 0000000..ccd7fcf
--- /dev/null
+++ b/lib/csharp/src/Protocol/TMultiplexedProtocol.cs
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+using System;
+using System.Text;
+using Thrift.Transport;
+using System.Collections.Generic;
+
+namespace Thrift.Protocol 
+{
+
+    /**
+     * TMultiplexedProtocol is a protocol-independent concrete decorator that allows a Thrift 
+     * client to communicate with a multiplexing Thrift server, by prepending the service name 
+     * to the function name during function calls.
+     *
+     * NOTE: THIS IS NOT TO BE USED BY SERVERS.  
+     * On the server, use TMultiplexedProcessor to handle requests from a multiplexing client.
+     *
+     * This example uses a single socket transport to invoke two services:
+     *
+     *     TSocket transport = new TSocket("localhost", 9090);
+     *     transport.open();
+     *     
+     *     TBinaryProtocol protocol = new TBinaryProtocol(transport);
+     *
+     *     TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");
+     *     Calculator.Client service = new Calculator.Client(mp);
+     *
+     *     TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");
+     *     WeatherReport.Client service2 = new WeatherReport.Client(mp2);
+     *
+     *     System.out.println(service.add(2,2));
+     *     System.out.println(service2.getTemperature());
+     *
+     */
+    public class TMultiplexedProtocol : TProtocolDecorator 
+    {
+
+        /** Used to delimit the service name from the function name */
+        public static String SEPARATOR = ":";
+
+        private String ServiceName;
+
+        /**
+         * Wrap the specified protocol, allowing it to be used to communicate with a
+         * multiplexing server.  The <code>serviceName</code> is required as it is
+         * prepended to the message header so that the multiplexing server can broker
+         * the function call to the proper service.
+         *
+         * Args:
+         *  protocol        Your communication protocol of choice, e.g. TBinaryProtocol
+         *  serviceName     The service name of the service communicating via this protocol.
+         */
+        public TMultiplexedProtocol(TProtocol protocol, String serviceName) 
+            : base(protocol)
+        {
+            ServiceName = serviceName;
+        }
+
+        /**
+         * Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+         * Args:
+         *   tMessage     The original message.
+         */
+        public override void WriteMessageBegin(TMessage tMessage) 
+        {
+            switch(tMessage.Type)
+            {
+                case TMessageType.Call:
+                case TMessageType.Oneway:
+                    base.WriteMessageBegin(new TMessage(
+                        ServiceName + SEPARATOR + tMessage.Name,
+                        tMessage.Type,
+                        tMessage.SeqID));
+                    break;
+
+                default:
+                    base.WriteMessageBegin(tMessage);
+                    break;
+            }
+        }
+    }
+
+}
diff --git a/lib/csharp/src/Protocol/TProtocolDecorator.cs b/lib/csharp/src/Protocol/TProtocolDecorator.cs
new file mode 100644
index 0000000..e1977f5
--- /dev/null
+++ b/lib/csharp/src/Protocol/TProtocolDecorator.cs
@@ -0,0 +1,262 @@
+/**

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements. See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership. The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License. You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied. See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ *

+ * Contains some contributions under the Thrift Software License.

+ * Please see doc/old-thrift-license.txt in the Thrift distribution for

+ * details.

+ */

+

+using System;

+using System.Text;

+using Thrift.Transport;

+using System.Collections.Generic;

+

+namespace Thrift.Protocol 

+{

+

+    /**

+     * TProtocolDecorator forwards all requests to an enclosed TProtocol instance, 

+     * providing a way to author concise concrete decorator subclasses.  While it has 

+     * no abstract methods, it is marked abstract as a reminder that by itself, 

+     * it does not modify the behaviour of the enclosed TProtocol.

+     *

+     * See p.175 of Design Patterns (by Gamma et al.)

+     * See TMultiplexedProtocol

+     */

+    public abstract class TProtocolDecorator : TProtocol 

+    {

+        private TProtocol WrappedProtocol;

+

+        /**

+         * Encloses the specified protocol.

+         * @param protocol All operations will be forward to this protocol.  Must be non-null.

+         */

+        public TProtocolDecorator(TProtocol protocol) 

+            : base( protocol.Transport)

+        {

+            

+            WrappedProtocol = protocol;

+        }

+

+        public override void WriteMessageBegin(TMessage tMessage) 

+        {

+            WrappedProtocol.WriteMessageBegin(tMessage);

+        }

+

+        public override void WriteMessageEnd()

+        {

+            WrappedProtocol.WriteMessageEnd();

+        }

+

+        public override void WriteStructBegin(TStruct tStruct)

+        {

+            WrappedProtocol.WriteStructBegin(tStruct);

+        }

+

+        public override void WriteStructEnd()

+        {

+            WrappedProtocol.WriteStructEnd();

+        }

+

+        public override void WriteFieldBegin(TField tField)

+        {

+            WrappedProtocol.WriteFieldBegin(tField);

+        }

+

+        public override void WriteFieldEnd()

+        {

+            WrappedProtocol.WriteFieldEnd();

+        }

+

+        public override void WriteFieldStop()

+        {

+            WrappedProtocol.WriteFieldStop();

+        }

+

+        public override void WriteMapBegin(TMap tMap) 

+        {

+            WrappedProtocol.WriteMapBegin(tMap);

+        }

+

+        public override void WriteMapEnd()

+        {

+            WrappedProtocol.WriteMapEnd();

+        }

+

+        public override void WriteListBegin(TList tList)  

+        {

+            WrappedProtocol.WriteListBegin(tList);

+        }

+

+        public override void WriteListEnd()

+{

+            WrappedProtocol.WriteListEnd();

+        }

+

+        public override void WriteSetBegin(TSet tSet)  

+        {

+            WrappedProtocol.WriteSetBegin(tSet);

+        }

+

+        public override void WriteSetEnd()

+        {

+            WrappedProtocol.WriteSetEnd();

+        }

+

+        public override void WriteBool(bool b)  

+        {

+            WrappedProtocol.WriteBool(b);

+        }

+

+        public override void WriteByte(sbyte b)

+        {

+            WrappedProtocol.WriteByte(b);

+        }

+

+        public override void WriteI16(short i)

+        {

+            WrappedProtocol.WriteI16(i);

+        }

+

+        public override void WriteI32(int i)

+        {

+            WrappedProtocol.WriteI32(i);

+        }

+

+        public override void WriteI64(long l)

+        {

+            WrappedProtocol.WriteI64(l);

+        }

+

+        public override void WriteDouble(double v)

+        {

+            WrappedProtocol.WriteDouble(v);

+        }

+

+        public override void WriteString(String s)

+        {

+            WrappedProtocol.WriteString(s);

+        }

+

+        public override void WriteBinary(byte[] bytes)

+        {

+            WrappedProtocol.WriteBinary(bytes);

+        }

+

+        public override TMessage ReadMessageBegin()

+        {

+            return WrappedProtocol.ReadMessageBegin();

+        }

+

+        public override void ReadMessageEnd()

+        {

+            WrappedProtocol.ReadMessageEnd();

+        }

+

+        public override TStruct ReadStructBegin()

+        {

+            return WrappedProtocol.ReadStructBegin();

+        }

+

+        public override void ReadStructEnd()

+        {

+            WrappedProtocol.ReadStructEnd();

+        }

+

+        public override TField ReadFieldBegin()

+        {

+            return WrappedProtocol.ReadFieldBegin();

+        }

+

+        public override void ReadFieldEnd()

+        {

+            WrappedProtocol.ReadFieldEnd();

+        }

+

+        public override TMap ReadMapBegin()

+        {

+            return WrappedProtocol.ReadMapBegin();

+        }

+

+        public override void ReadMapEnd()

+        {

+            WrappedProtocol.ReadMapEnd();

+        }

+

+        public override TList ReadListBegin()

+        {

+            return WrappedProtocol.ReadListBegin();

+        }

+

+        public override void ReadListEnd()

+        {

+            WrappedProtocol.ReadListEnd();

+        }

+

+        public override TSet ReadSetBegin()

+        {

+            return WrappedProtocol.ReadSetBegin();

+        }

+

+        public override void ReadSetEnd()

+        {

+            WrappedProtocol.ReadSetEnd();

+        }

+

+        public override bool ReadBool()

+        {

+            return WrappedProtocol.ReadBool();

+        }

+

+        public override sbyte ReadByte()

+        {

+            return WrappedProtocol.ReadByte();

+        }

+

+        public override short ReadI16()

+        {

+            return WrappedProtocol.ReadI16();

+        }

+

+        public override int ReadI32()

+        {

+            return WrappedProtocol.ReadI32();

+        }

+

+        public override long ReadI64()

+        {

+            return WrappedProtocol.ReadI64();

+        }

+

+        public override double ReadDouble()

+        {

+            return WrappedProtocol.ReadDouble();

+        }

+

+        public override String ReadString()

+        {

+            return WrappedProtocol.ReadString();

+        }

+

+        public override byte[] ReadBinary()

+        {

+            return WrappedProtocol.ReadBinary();

+        }

+    }

+

+}