THRIFT-2504: Add default processor to java multiplexed processor to handle older clients
Client: Java
This closes #114
This closes #1195
diff --git a/lib/java/build.xml b/lib/java/build.xml
index 40cd4f6..6af9f27 100644
--- a/lib/java/build.xml
+++ b/lib/java/build.xml
@@ -372,6 +372,7 @@
<artifact:dependencies filesetId="test-dependency-jars" useScope="runtime">
<dependency groupId="org.slf4j" artifactId="slf4j-log4j12" version="${slf4j.version}"/>
<dependency groupId="junit" artifactId="junit" version="4.4"/>
+ <dependency groupId="org.mockito" artifactId="mockito-all" version="1.9.5"/>
</artifact:dependencies>
<!-- Copy the test dependencies to the build/lib dir -->
diff --git a/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java b/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java
index f6547ac..d0c5603 100644
--- a/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java
+++ b/lib/java/src/org/apache/thrift/TMultiplexedProcessor.java
@@ -52,6 +52,7 @@
private final Map<String,TProcessor> SERVICE_PROCESSOR_MAP
= new HashMap<String,TProcessor>();
+ private TProcessor defaultProcessor;
/**
* 'Register' a service with this <code>TMultiplexedProcessor</code>. This
@@ -68,6 +69,14 @@
}
/**
+ * Register a service to be called to process queries without service name
+ * @param processor
+ */
+ public void registerDefault(TProcessor processor) {
+ defaultProcessor = processor;
+ }
+
+ /**
* This implementation of <code>process</code> performs the following steps:
*
* <ol>
@@ -77,7 +86,7 @@
* <li>Dispatch to the processor, with a decorated instance of TProtocol
* that allows readMessageBegin() to return the original TMessage.</li>
* </ol>
- *
+ *
* @throws TException If the message type is not CALL or ONEWAY, if
* the service name was not found in the message, or if the service
* name was not found in the service map. You called {@link #registerProcessor(String, TProcessor) registerProcessor}
@@ -92,14 +101,16 @@
TMessage message = iprot.readMessageBegin();
if (message.type != TMessageType.CALL && message.type != TMessageType.ONEWAY) {
- // TODO Apache Guys - Can the server ever get an EXCEPTION or REPLY?
- // TODO Should we check for this here?
throw new TException("This should not have happened!?");
}
// Extract the service name
int index = message.name.indexOf(TMultiplexedProtocol.SEPARATOR);
if (index < 0) {
+ if (defaultProcessor != null) {
+ // Dispatch processing to the stored processor
+ return defaultProcessor.process(new StoredMessageProtocol(iprot, message), oprot);
+ }
throw new TException("Service name not found in message name: " + message.name + ". Did you " +
"forget to use a TMultiplexProtocol in your client?");
}
diff --git a/lib/java/test/org/apache/thrift/TestMultiplexedProcessor.java b/lib/java/test/org/apache/thrift/TestMultiplexedProcessor.java
new file mode 100644
index 0000000..01776ca
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/TestMultiplexedProcessor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TMessageType;
+import org.apache.thrift.protocol.TProtocol;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMultiplexedProcessor {
+ private TMultiplexedProcessor mp;
+ private TProtocol iprot;
+ private TProtocol oprot;
+
+ @Before
+ public void setUp() throws Exception {
+ mp = new TMultiplexedProcessor();
+ iprot = mock(TProtocol.class);
+ oprot = mock(TProtocol.class);
+ }
+
+ @Test(expected = TException.class)
+ public void testWrongMessageType() throws TException {
+ when (iprot.readMessageBegin()).thenReturn(new TMessage("service:func", TMessageType.REPLY, 42));
+ mp.process(iprot, oprot);
+ }
+
+ @Test(expected = TException.class)
+ public void testNoSuchService() throws TException {
+ when(iprot.readMessageBegin()).thenReturn(new TMessage("service:func", TMessageType.CALL, 42));
+
+ mp.process(iprot, oprot);
+ }
+
+ static class StubProcessor implements TProcessor {
+ @Override
+ public boolean process(TProtocol in, TProtocol out) throws TException {
+ TMessage msg = in.readMessageBegin();
+ if (!"func".equals(msg.name) || msg.type!=TMessageType.CALL || msg.seqid!=42) {
+ throw new TException("incorrect parameters");
+ }
+ out.writeMessageBegin(new TMessage("func", TMessageType.REPLY, 42));
+ return true;
+ }
+ }
+
+ @Test
+ public void testExistingService() throws TException {
+ when(iprot.readMessageBegin()).thenReturn(new TMessage("service:func", TMessageType.CALL, 42));
+ mp.registerProcessor("service", new StubProcessor());
+ mp.process(iprot, oprot);
+ verify(oprot).writeMessageBegin(any(TMessage.class));
+ }
+
+ @Test
+ public void testDefaultService() throws TException {
+ when(iprot.readMessageBegin()).thenReturn(new TMessage("func", TMessageType.CALL, 42));
+ mp.registerDefault(new StubProcessor());
+ mp.process(iprot, oprot);
+ verify(oprot).writeMessageBegin(any(TMessage.class));
+ }
+
+}