THRIFT-563 Support for Multiplexing Services on any Transport, Protocol and Server
Patch: Rob Slifka
diff --git a/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java b/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java
new file mode 100644
index 0000000..8547cf0
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java
@@ -0,0 +1,124 @@
+package org.apache.thrift;
+
+import org.apache.thrift.protocol.*;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing
+ * a single <code>TServer</code> to provide multiple services.
+ *
+ * <p>To do so, you instantiate the processor and then register additional
+ * processors with it, as shown in the following example:</p>
+ *
+ * <blockquote><code>
+ * TMultiplexedProcessor processor = new TMultiplexedProcessor();
+ *
+ * processor.registerProcessor(
+ * "Calculator",
+ * new Calculator.Processor(new CalculatorHandler()));
+ *
+ * processor.registerProcessor(
+ * "WeatherReport",
+ * new WeatherReport.Processor(new WeatherReportHandler()));
+ *
+ * TServerTransport t = new TServerSocket(9090);
+ * TSimpleServer server = new TSimpleServer(processor, t);
+ *
+ * server.serve();
+ * </code></blockquote>
+ */
+public class TMultiplexedProcessor implements TProcessor {
+
+ private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
+ = new HashMap<String,TProcessor>();
+
+ /**
+ * 'Register' a service with this <code>TMultiplexedProcessor</code>. This
+ * allows us to broker requests to individual services by using the service
+ * name to select them at request time.
+ *
+ * @param serviceName Name of a service, has to be identical to the name
+ * declared in the Thrift IDL, e.g. "WeatherReport".
+ * @param processor Implementation of a service, ususally referred to
+ * as "handlers", e.g. WeatherReportHandler implementing WeatherReport.Iface.
+ */
+ public void registerProcessor(String serviceName, TProcessor processor) {
+ SERVICE_PROCESSOR_MAP.put(serviceName, processor);
+ }
+
+ /**
+ * This implementation of <code>process</code> performs the following steps:
+ *
+ * <ol>
+ * <li>Read the beginning of the message.</li>
+ * <li>Extract the service name from the message.</li>
+ * <li>Using the service name to locate the appropriate processor.</li>
+ * <li>Dispatch to the processor, with a decorated instance of TProtocol
+ * that allows readMessageBegin() to return the original TMessage.</li>
+ * </ol>
+ *
+ * @throws TException If the message type is not CALL or ONEWAY, if
+ * the service name was not found in the message, or if the service
+ * name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor}
+ * during initialization, right? :)
+ */
+ public boolean process(TProtocol iprot, TProtocol oprot) throws TException {
+ /*
+ Use the actual underlying protocol (e.g. TBinaryProtocol) to read the
+ message header. This pulls the message "off the wire", which we'll
+ deal with at the end of this method.
+ */
+ TMessage message = iprot.readMessageBegin();
+
+ if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
+ // TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
+ // TODO Should we check for this here?
+ throw new TException("This should not have happened!?");
+ }
+
+ // Extract the service name
+ int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
+ if (index < 0) {
+ throw new TException("Service name not found in message name: " + message.name + ". Did you " +
+ "forget to use a TMultiplexProtocol in your client?");
+ }
+
+ // Create a new TMessage, something that can be consumed by any TProtocol
+ String serviceName = message.name.substring(0, index);
+ TProcessor actualProcessor = SERVICE_PROCESSOR_MAP.get(serviceName);
+ if (actualProcessor == null) {
+ throw new TException("Service name not found: " + serviceName + ". Did you forget " +
+ "to call registerProcessor()?");
+ }
+
+ // Create a new TMessage, removing the service name
+ TMessage standardMessage = new TMessage(
+ message.name.substring(serviceName.length()+TMultiplexedProtocol.SEPARATOR.length()),
+ message.type,
+ message.seqid
+ );
+
+ // Dispatch processing to the stored processor
+ return actualProcessor.process(new StoredMessageProtocol(iprot, standardMessage), oprot);
+ }
+
+ /**
+ * Our goal was to work with any protocol. In order to do that, we needed
+ * to allow them to call readMessageBegin() and get a TMessage in exactly
+ * the standard format, without the service name prepended to TMessage.name.
+ */
+ private class StoredMessageProtocol extends TProtocolDecorator {
+ TMessage messageBegin;
+ public StoredMessageProtocol(TProtocol protocol, TMessage messageBegin) {
+ super(protocol);
+ this.messageBegin = messageBegin;
+ }
+ @Override
+ public TMessage readMessageBegin() throws TException {
+ return messageBegin;
+ }
+ }
+
+}
diff --git a/lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java b/lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java
new file mode 100644
index 0000000..4b5e671
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/protocol/TMultiplexedProtocol.java
@@ -0,0 +1,72 @@
+package org.apache.thrift.protocol;
+
+import org.apache.thrift.TException;
+
+/**
+ * <code>TMultiplexedProtocol</code> is a protocol-independent concrete decorator
+ * that allows a Thrift client to communicate with a multiplexing Thrift server,
+ * by prepending the service name to the function name during function calls.
+ *
+ * <p>NOTE: THIS IS NOT USED BY SERVERS. On the server, use {@link org.apache.thrift.TMultiplexedProcessor TMultiplexedProcessor} to handle requests
+ * from a multiplexing client.
+ *
+ * <p>This example uses a single socket transport to invoke two services:
+ *
+ * <blockquote><code>
+ * TSocket transport = new TSocket("localhost", 9090);<br/>
+ * transport.open();<br/>
+ *<br/>
+ * TBinaryProtocol protocol = new TBinaryProtocol(transport);<br/>
+ *<br/>
+ * TMultiplexedProtocol mp = new TMultiplexedProtocol(protocol, "Calculator");<br/>
+ * Calculator.Client service = new Calculator.Client(mp);<br/>
+ *<br/>
+ * TMultiplexedProtocol mp2 = new TMultiplexedProtocol(protocol, "WeatherReport");<br/>
+ * WeatherReport.Client service2 = new WeatherReport.Client(mp2);<br/>
+ *<br/>
+ * System.out.println(service.add(2,2));<br/>
+ * System.out.println(service2.getTemperature());<br/>
+ * </code></blockquote>
+ *
+ * @see org.apache.thrift.protocol.TProtocolDecorator
+ */
+public class TMultiplexedProtocol extends TProtocolDecorator {
+
+ /** Used to delimit the service name from the function name */
+ public static final String SEPARATOR = ":";
+
+ private final String SERVICE_NAME;
+
+ /**
+ * Wrap the specified protocol, allowing it to be used to communicate with a
+ * multiplexing server. The <code>serviceName</code> is required as it is
+ * prepended to the message header so that the multiplexing server can broker
+ * the function call to the proper service.
+ *
+ * @param protocol Your communication protocol of choice, e.g. <code>TBinaryProtocol</code>.
+ * @param serviceName The service name of the service communicating via this protocol.
+ */
+ public TMultiplexedProtocol(TProtocol protocol, String serviceName) {
+ super(protocol);
+ SERVICE_NAME = serviceName;
+ }
+
+ /**
+ * Prepends the service name to the function name, separated by TMultiplexedProtocol.SEPARATOR.
+ *
+ * @param tMessage The original message.
+ * @throws TException Passed through from wrapped <code>TProtocol</code> instance.
+ */
+ @Override
+ public void writeMessageBegin(TMessage tMessage) throws TException {
+ if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
+ super.writeMessageBegin(new TMessage(
+ SERVICE_NAME + SEPARATOR + tMessage.name,
+ tMessage.type,
+ tMessage.seqid
+ ));
+ } else {
+ super.writeMessageBegin(tMessage);
+ }
+ }
+}
diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java b/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java
new file mode 100644
index 0000000..6190d13
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java
@@ -0,0 +1,192 @@
+package org.apache.thrift.protocol;
+
+import org.apache.thrift.TException;
+
+/**
+ * <code>TProtocolDecorator</code> forwards all requests to an enclosed
+ * <code>TProtocol</code> instance, providing a way to author concise
+ * concrete decorator subclasses. While it has no abstract methods, it
+ * is marked abstract as a reminder that by itself, it does not modify
+ * the behaviour of the enclosed <code>TProtocol</code>.
+ *
+ * <p>See p.175 of Design Patterns (by Gamma et al.)</p>
+ *
+ * @see org.apache.thrift.protocol.TMultiplexedProtocol
+ */
+public abstract class TProtocolDecorator extends TProtocol {
+
+ private final TProtocol concreteProtocol;
+
+ /**
+ * Encloses the specified protocol.
+ * @param protocol All operations will be forward to this protocol. Must be non-null.
+ */
+ public TProtocolDecorator(TProtocol protocol) {
+ super(protocol.getTransport());
+ concreteProtocol = protocol;
+ }
+
+ public void writeMessageBegin(TMessage tMessage) throws TException {
+ concreteProtocol.writeMessageBegin(tMessage);
+ }
+
+ public void writeMessageEnd() throws TException {
+ concreteProtocol.writeMessageEnd();
+ }
+
+ public void writeStructBegin(TStruct tStruct) throws TException {
+ concreteProtocol.writeStructBegin(tStruct);
+ }
+
+ public void writeStructEnd() throws TException {
+ concreteProtocol.writeStructEnd();
+ }
+
+ public void writeFieldBegin(TField tField) throws TException {
+ concreteProtocol.writeFieldBegin(tField);
+ }
+
+ public void writeFieldEnd() throws TException {
+ concreteProtocol.writeFieldEnd();
+ }
+
+ public void writeFieldStop() throws TException {
+ concreteProtocol.writeFieldStop();
+ }
+
+ public void writeMapBegin(TMap tMap) throws TException {
+ concreteProtocol.writeMapBegin(tMap);
+ }
+
+ public void writeMapEnd() throws TException {
+ concreteProtocol.writeMapEnd();
+ }
+
+ public void writeListBegin(TList tList) throws TException {
+ concreteProtocol.writeListBegin(tList);
+ }
+
+ public void writeListEnd() throws TException {
+ concreteProtocol.writeListEnd();
+ }
+
+ public void writeSetBegin(TSet tSet) throws TException {
+ concreteProtocol.writeSetBegin(tSet);
+ }
+
+ public void writeSetEnd() throws TException {
+ concreteProtocol.writeSetEnd();
+ }
+
+ public void writeBool(boolean b) throws TException {
+ concreteProtocol.writeBool(b);
+ }
+
+ public void writeByte(byte b) throws TException {
+ concreteProtocol.writeByte(b);
+ }
+
+ public void writeI16(short i) throws TException {
+ concreteProtocol.writeI16(i);
+ }
+
+ public void writeI32(int i) throws TException {
+ concreteProtocol.writeI32(i);
+ }
+
+ public void writeI64(long l) throws TException {
+ concreteProtocol.writeI64(l);
+ }
+
+ public void writeDouble(double v) throws TException {
+ concreteProtocol.writeDouble(v);
+ }
+
+ public void writeString(String s) throws TException {
+ concreteProtocol.writeString(s);
+ }
+
+ public void writeBinary(byte[] bytes) throws TException {
+ concreteProtocol.writeBinary(bytes);
+ }
+
+ public TMessage readMessageBegin() throws TException {
+ return concreteProtocol.readMessageBegin();
+ }
+
+ public void readMessageEnd() throws TException {
+ concreteProtocol.readMessageEnd();
+ }
+
+ public TStruct readStructBegin() throws TException {
+ return concreteProtocol.readStructBegin();
+ }
+
+ public void readStructEnd() throws TException {
+ concreteProtocol.readStructEnd();
+ }
+
+ public TField readFieldBegin() throws TException {
+ return concreteProtocol.readFieldBegin();
+ }
+
+ public void readFieldEnd() throws TException {
+ concreteProtocol.readFieldEnd();
+ }
+
+ public TMap readMapBegin() throws TException {
+ return concreteProtocol.readMapBegin();
+ }
+
+ public void readMapEnd() throws TException {
+ concreteProtocol.readMapEnd();
+ }
+
+ public TList readListBegin() throws TException {
+ return concreteProtocol.readListBegin();
+ }
+
+ public void readListEnd() throws TException {
+ concreteProtocol.readListEnd();
+ }
+
+ public TSet readSetBegin() throws TException {
+ return concreteProtocol.readSetBegin();
+ }
+
+ public void readSetEnd() throws TException {
+ concreteProtocol.readSetEnd();
+ }
+
+ public boolean readBool() throws TException {
+ return concreteProtocol.readBool();
+ }
+
+ public byte readByte() throws TException {
+ return concreteProtocol.readByte();
+ }
+
+ public short readI16() throws TException {
+ return concreteProtocol.readI16();
+ }
+
+ public int readI32() throws TException {
+ return concreteProtocol.readI32();
+ }
+
+ public long readI64() throws TException {
+ return concreteProtocol.readI64();
+ }
+
+ public double readDouble() throws TException {
+ return concreteProtocol.readDouble();
+ }
+
+ public String readString() throws TException {
+ return concreteProtocol.readString();
+ }
+
+ public byte[] readBinary() throws TException {
+ return concreteProtocol.readBinary();
+ }
+}