THRIFT-563 Support for Multiplexing Services on any Transport, Protocol and Server

Patch: Rob Slifka
diff --git a/lib/java/src/org/apache/thrift/ b/lib/java/src/org/apache/thrift/
new file mode 100644
index 0000000..8547cf0
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/
@@ -0,0 +1,124 @@
+package org.apache.thrift;
+import org.apache.thrift.protocol.*;
+import java.util.Map;
+import java.util.HashMap;
+ * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
+ * a single <code>TServer</code> to provide multiple services.
+ *
+ * <p>To do so, you instantiate the processor and then register additional
+ * processors with it, as shown in the following example:</p>
+ *
+ * <blockquote><code>
+ *     TMultiplexedProcessor processor = new TMultiplexedProcessor();
+ *
+ *     processor.registerProcessor(
+ *         "Calculator",
+ *         new Calculator.Processor(new CalculatorHandler()));
+ *
+ *     processor.registerProcessor(
+ *         "WeatherReport",
+ *         new WeatherReport.Processor(new WeatherReportHandler()));
+ *
+ *     TServerTransport t = new TServerSocket(9090);
+ *     TSimpleServer server = new TSimpleServer(processor, t);
+ *
+ *     server.serve();
+ * </code></blockquote>
+ */
+public class TMultiplexedProcessor implements TProcessor {
+    private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
+            = new HashMap<String,TProcessor>();
+    /**
+     * 'Register' a service with this <code>TMultiplexedProcessor</code>.  This
+     * allows us to broker requests to individual services by using the service
+     * name to select them at request time.
+     *
+     * @param serviceName Name of a service, has to be identical to the name
+     * declared in the Thrift IDL, e.g. "WeatherReport".
+     * @param processor Implementation of a service, ususally referred to
+     * as "handlers", e.g. WeatherReportHandler implementing WeatherReport.Iface.
+     */
+    public void registerProcessor(String serviceName, TProcessor processor) {
+        SERVICE_PROCESSOR_MAP.put(serviceName, processor);
+    }
+    /**
+     * This implementation of <code>process</code> performs the following steps:
+     *
+     * <ol>
+     *     <li>Read the beginning of the message.</li>
+     *     <li>Extract the service name from the message.</li>
+     *     <li>Using the service name to locate the appropriate processor.</li>
+     *     <li>Dispatch to the processor, with a decorated instance of TProtocol
+     *         that allows readMessageBegin() to return the original TMessage.</li>
+     * </ol>
+     *  
+     * @throws TException If the message type is not CALL or ONEWAY, if
+     * the service name was not found in the message, or if the service
+     * name was not found in the service map.  You called {@link #registerProcessor(String, TProcessor) registerProcessor}
+     * during initialization, right? :)
+     */
+    public boolean process(TProtocol iprot, TProtocol oprot) throws TException {
+        /*
+            Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+            message header.  This pulls the message "off the wire", which we'll
+            deal with at the end of this method.
+        */
+        TMessage message = iprot.readMessageBegin();
+        if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
+            // TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
+            // TODO Should we check for this here?
+            throw new TException("This should not have happened!?");
+        }
+        // Extract the service name
+        int index =;
+        if (index < 0) {
+            throw new TException("Service name not found in message name: " + + ".  Did you " +
+                    "forget to use a TMultiplexProtocol in your client?");
+        }
+        // Create a new TMessage, something that can be consumed by any TProtocol
+        String serviceName =, index);
+        TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName);
+        if (actualProcessor == null) {
+            throw new TException("Service name not found: " + serviceName + ".  Did you forget " +
+                    "to call registerProcessor()?");
+        }
+        // Create a new TMessage, removing the service name
+        TMessage standardMessage = new TMessage(
+      ,
+                message.type,
+                message.seqid
+        );
+        // Dispatch processing to the stored processor
+        return actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot);
+    }
+    /**
+     *  Our goal was to work with any protocol.  In order to do that, we needed
+     *  to allow them to call readMessageBegin() and get a TMessage in exactly
+     *  the standard format, without the service name prepended to
+     */
+    private class StoredMessageProtocol extends TProtocolDecorator {
+        TMessage messageBegin;
+        public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) {
+            super(protocol);
+            this.messageBegin = messageBegin;
+        }
+        @Override
+        public TMessage readMessageBegin() throws TException {
+            return messageBegin;
+        }
+    }
diff --git a/lib/java/src/org/apache/thrift/protocol/ b/lib/java/src/org/apache/thrift/protocol/
new file mode 100644
index 0000000..4b5e671
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/protocol/
@@ -0,0 +1,72 @@
+package org.apache.thrift.protocol;
+import org.apache.thrift.TException;
+ * <code>TMultiplexedProtocol</code> is a protocol-independent concrete decorator
+ * that allows a Thrift client to communicate with a multiplexing Thrift server,
+ * by prepending the service name to the function name during function calls.
+ *
+ * <p>NOTE: THIS IS NOT USED BY SERVERS.  On the server, use {@link org.apache.thrift.TMultiplexedProcessor TMultiplexedProcessor} to handle requests
+ * from a multiplexing client.
+ *
+ * <p>This example uses a single socket transport to invoke two services:
+ *
+ * <blockquote><code>
+ *     TSocket transport = new TSocket("localhost", 9090);<br/>
+ *;<br/>
+ *<br/>
+ *     TBinaryProtocol protocol = new TBinaryProtocol(transport);<br/>
+ *<br/>
+ *     TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");<br/>
+ *     Calculator.Client service = new Calculator.Client(mp);<br/>
+ *<br/>
+ *     TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");<br/>
+ *     WeatherReport.Client service2 = new WeatherReport.Client(mp2);<br/>
+ *<br/>
+ *     System.out.println(service.add(2,2));<br/>
+ *     System.out.println(service2.getTemperature());<br/>
+ * </code></blockquote>
+ *
+ * @see org.apache.thrift.protocol.TProtocolDecorator
+ */
+public class TMultiplexedProtocol extends TProtocolDecorator {
+    /** Used to delimit the service name from the function name */
+    public static final String SEPARATOR = ":";
+    private final String SERVICE_NAME;
+    /**
+     * Wrap the specified protocol, allowing it to be used to communicate with a
+     * multiplexing server.  The <code>serviceName</code> is required as it is
+     * prepended to the message header so that the multiplexing server can broker
+     * the function call to the proper service.
+     *
+     * @param protocol Your communication protocol of choice, e.g. <code>TBinaryProtocol</code>.
+     * @param serviceName The service name of the service communicating via this protocol.
+     */
+    public TMultiplexedProtocol(TProtocol protocol, String serviceName) {
+        super(protocol);
+        SERVICE_NAME = serviceName;
+    }
+    /**
+     * Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+     *
+     * @param tMessage The original message.
+     * @throws TException Passed through from wrapped <code>TProtocol</code> instance.
+     */
+    @Override
+    public void writeMessageBegin(TMessage tMessage) throws TException {
+        if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
+            super.writeMessageBegin(new TMessage(
+                    SERVICE_NAME + SEPARATOR +,
+                    tMessage.type,
+                    tMessage.seqid
+            ));
+        } else {
+            super.writeMessageBegin(tMessage);
+        }
+    }
diff --git a/lib/java/src/org/apache/thrift/protocol/ b/lib/java/src/org/apache/thrift/protocol/
new file mode 100644
index 0000000..6190d13
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/protocol/
@@ -0,0 +1,192 @@
+package org.apache.thrift.protocol;
+import org.apache.thrift.TException;
+ * <code>TProtocolDecorator</code> forwards all requests to an enclosed
+ * <code>TProtocol</code> instance, providing a way to author concise
+ * concrete decorator subclasses.  While it has no abstract methods, it
+ * is marked abstract as a reminder that by itself, it does not modify
+ * the behaviour of the enclosed <code>TProtocol</code>.
+ *
+ * <p>See p.175 of Design Patterns (by Gamma et al.)</p>
+ * 
+ * @see org.apache.thrift.protocol.TMultiplexedProtocol
+ */
+public abstract class TProtocolDecorator extends TProtocol {
+    private final TProtocol concreteProtocol;
+    /**
+     * Encloses the specified protocol.
+     * @param protocol All operations will be forward to this protocol.  Must be non-null.
+     */
+    public TProtocolDecorator(TProtocol protocol) {
+        super(protocol.getTransport());
+        concreteProtocol = protocol;
+    }
+    public void writeMessageBegin(TMessage tMessage) throws TException {
+        concreteProtocol.writeMessageBegin(tMessage);
+    }
+    public void writeMessageEnd() throws TException {
+        concreteProtocol.writeMessageEnd();
+    }
+    public void writeStructBegin(TStruct tStruct) throws TException {
+        concreteProtocol.writeStructBegin(tStruct);
+    }
+    public void writeStructEnd() throws TException {
+        concreteProtocol.writeStructEnd();
+    }
+    public void writeFieldBegin(TField tField) throws TException {
+        concreteProtocol.writeFieldBegin(tField);
+    }
+    public void writeFieldEnd() throws TException {
+        concreteProtocol.writeFieldEnd();
+    }
+    public void writeFieldStop() throws TException {
+        concreteProtocol.writeFieldStop();
+    }
+    public void writeMapBegin(TMap tMap) throws TException {
+        concreteProtocol.writeMapBegin(tMap);
+    }
+    public void writeMapEnd() throws TException {
+        concreteProtocol.writeMapEnd();
+    }
+    public void writeListBegin(TList tList) throws TException {
+        concreteProtocol.writeListBegin(tList);
+    }
+    public void writeListEnd() throws TException {
+        concreteProtocol.writeListEnd();
+    }
+    public void writeSetBegin(TSet tSet) throws TException {
+        concreteProtocol.writeSetBegin(tSet);
+    }
+    public void writeSetEnd() throws TException {
+        concreteProtocol.writeSetEnd();
+    }
+    public void writeBool(boolean b) throws TException {
+        concreteProtocol.writeBool(b);
+    }
+    public void writeByte(byte b) throws TException {
+        concreteProtocol.writeByte(b);
+    }
+    public void writeI16(short i) throws TException {
+        concreteProtocol.writeI16(i);
+    }
+    public void writeI32(int i) throws TException {
+        concreteProtocol.writeI32(i);
+    }
+    public void writeI64(long l) throws TException {
+        concreteProtocol.writeI64(l);
+    }
+    public void writeDouble(double v) throws TException {
+        concreteProtocol.writeDouble(v);
+    }
+    public void writeString(String s) throws TException {
+        concreteProtocol.writeString(s);
+    }
+    public void writeBinary(byte[] bytes) throws TException {
+        concreteProtocol.writeBinary(bytes);
+    }
+    public TMessage readMessageBegin() throws TException {
+        return concreteProtocol.readMessageBegin();
+    }
+    public void readMessageEnd() throws TException {
+        concreteProtocol.readMessageEnd();
+    }
+    public TStruct readStructBegin() throws TException {
+        return concreteProtocol.readStructBegin();
+    }
+    public void readStructEnd() throws TException {
+        concreteProtocol.readStructEnd();
+    }
+    public TField readFieldBegin() throws TException {
+        return concreteProtocol.readFieldBegin();
+    }
+    public void readFieldEnd() throws TException {
+        concreteProtocol.readFieldEnd();
+    }
+    public TMap readMapBegin() throws TException {
+        return concreteProtocol.readMapBegin();
+    }
+    public void readMapEnd() throws TException {
+        concreteProtocol.readMapEnd();
+    }
+    public TList readListBegin() throws TException {
+        return concreteProtocol.readListBegin();
+    }
+    public void readListEnd() throws TException {
+        concreteProtocol.readListEnd();
+    }
+    public TSet readSetBegin() throws TException {
+        return concreteProtocol.readSetBegin();
+    }
+    public void readSetEnd() throws TException {
+        concreteProtocol.readSetEnd();
+    }
+    public boolean readBool() throws TException {
+        return concreteProtocol.readBool();
+    }
+    public byte readByte() throws TException {
+        return concreteProtocol.readByte();
+    }
+    public short readI16() throws TException {
+        return concreteProtocol.readI16();
+    }
+    public int readI32() throws TException {
+        return concreteProtocol.readI32();
+    }
+    public long readI64() throws TException {
+        return concreteProtocol.readI64();
+    }
+    public double readDouble() throws TException {
+        return concreteProtocol.readDouble();
+    }
+    public String readString() throws TException {
+        return concreteProtocol.readString();
+    }
+    public byte[] readBinary() throws TException {
+        return concreteProtocol.readBinary();
+    }