THRIFT-3680 Java async processor fails to notify errors to clients
This closes #903
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 2613a66..0fa51cd 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -438,8 +438,6 @@
+ "import org.apache.thrift.protocol.TProtocolException;\n"
+ "import org.apache.thrift.EncodingUtils;\n"
+ option
- + "import org.apache.thrift.TException;\n"
- + "import org.apache.thrift.async.AsyncMethodCallback;\n"
+ "import org.apache.thrift.server.AbstractNonblockingServer.*;\n"
+ "import java.util.List;\n" + "import java.util.ArrayList;\n" + "import java.util.Map;\n"
+ "import java.util.HashMap;\n" + "import java.util.EnumMap;\n" + "import java.util.Set;\n"
@@ -1366,7 +1364,7 @@
<< tstruct->get_name() << " ";
if (is_exception) {
- out << "extends TException ";
+ out << "extends org.apache.thrift.TException ";
}
out << "implements org.apache.thrift.TBase<" << tstruct->get_name() << ", " << tstruct->get_name()
<< "._Fields>, java.io.Serializable, Cloneable, Comparable<" << tstruct->get_name() << ">";
@@ -3303,11 +3301,12 @@
indent(f_service_) << " return new " << argsname << "();" << endl;
indent(f_service_) << "}" << endl << endl;
- indent(f_service_) << "public AsyncMethodCallback<" << resulttype
+ indent(f_service_) << "public org.apache.thrift.async.AsyncMethodCallback<" << resulttype
<< "> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {" << endl;
indent_up();
indent(f_service_) << "final org.apache.thrift.AsyncProcessFunction fcall = this;" << endl;
- indent(f_service_) << "return new AsyncMethodCallback<" << resulttype << ">() { " << endl;
+ indent(f_service_) << "return new org.apache.thrift.async.AsyncMethodCallback<" << resulttype
+ << ">() { " << endl;
indent_up();
indent(f_service_) << "public void onComplete(" << resulttype << " o) {" << endl;
@@ -3341,48 +3340,65 @@
indent(f_service_) << "public void onError(Exception e) {" << endl;
indent_up();
- if (!tfunction->is_oneway()) {
+ if (tfunction->is_oneway()) {
+ f_service_ << indent() << "LOGGER.error(\"Exception inside oneway handler\", e);" << endl;
+ } else {
indent(f_service_) << "byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;" << endl;
- indent(f_service_) << "org.apache.thrift.TBase msg;" << endl;
+ indent(f_service_) << "org.apache.thrift.TSerializable msg;" << endl;
indent(f_service_) << resultname << " result = new " << resultname << "();" << endl;
t_struct* xs = tfunction->get_xceptions();
const std::vector<t_field*>& xceptions = xs->get_members();
+
vector<t_field*>::const_iterator x_iter;
if (xceptions.size() > 0) {
for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
- if (x_iter != xceptions.begin())
- indent(f_service_) << "else ";
- indent(f_service_) << "if (e instanceof " << type_name((*x_iter)->get_type(), false, false)
- << ") {" << endl;
- indent(f_service_) << indent() << "result." << (*x_iter)->get_name() << " = ("
- << type_name((*x_iter)->get_type(), false, false) << ") e;" << endl;
- indent(f_service_) << indent() << "result.set" << get_cap_name((*x_iter)->get_name())
- << get_cap_name("isSet") << "(true);" << endl;
- indent(f_service_) << indent() << "msg = result;" << endl;
-
- indent(f_service_) << "}" << endl;
+ if (x_iter == xceptions.begin())
+ f_service_ << indent();
+ string type = type_name((*x_iter)->get_type(), false, false);
+ string name = (*x_iter)->get_name();
+ f_service_ << "if (e instanceof " << type << ") {" << endl;
+ indent_up();
+ f_service_ << indent() << "result." << name << " = (" << type << ") e;" << endl
+ << indent() << "result.set" << get_cap_name(name) << get_cap_name("isSet")
+ << "(true);" << endl
+ << indent() << "msg = result;" << endl;
+ indent_down();
+ indent(f_service_) << "} else ";
}
- indent(f_service_) << " else " << endl;
+ } else {
+ indent(f_service_);
}
-
- indent(f_service_) << "{" << endl;
+ f_service_ << "if (e instanceof org.apache.thrift.transport.TTransportException) {" << endl;
indent_up();
- indent(f_service_) << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl;
- indent(f_service_) << "msg = (org.apache.thrift.TBase)new "
- "org.apache.thrift.TApplicationException(org.apache.thrift."
- "TApplicationException.INTERNAL_ERROR, e.getMessage());" << endl;
+ f_service_ << indent() << "LOGGER.error(\"TTransportException inside handler\", e);" << endl
+ << indent() << "fb.close();" << endl
+ << indent() << "return;" << endl;
indent_down();
- indent(f_service_) << "}" << endl;
-
- indent(f_service_) << "try {" << endl;
- indent(f_service_) << " fcall.sendResponse(fb,msg,msgType,seqid);" << endl;
- indent(f_service_) << " return;" << endl;
- indent(f_service_) << "} catch (Exception ex) {" << endl;
- indent(f_service_) << " LOGGER.error(\"Exception writing to internal frame buffer\", ex);"
+ indent(f_service_) << "} else if (e instanceof org.apache.thrift.TApplicationException) {"
<< endl;
- indent(f_service_) << "}" << endl;
- indent(f_service_) << "fb.close();" << endl;
+ indent_up();
+ f_service_ << indent() << "LOGGER.error(\"TApplicationException inside handler\", e);" << endl
+ << indent() << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl
+ << indent() << "msg = (org.apache.thrift.TApplicationException)e;" << endl;
+ indent_down();
+ indent(f_service_) << "} else {" << endl;
+ indent_up();
+ f_service_ << indent() << "LOGGER.error(\"Exception inside handler\", e);" << endl
+ << indent() << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl
+ << indent() << "msg = new "
+ "org.apache.thrift.TApplicationException(org.apache.thrift."
+ "TApplicationException.INTERNAL_ERROR, e.getMessage());"
+ << endl;
+ indent_down();
+ f_service_ << indent() << "}" << endl
+ << indent() << "try {" << endl
+ << indent() << " fcall.sendResponse(fb,msg,msgType,seqid);" << endl
+ << indent() << "} catch (Exception ex) {" << endl
+ << indent() << " LOGGER.error(\"Exception writing to internal frame buffer\", ex);"
+ << endl
+ << indent() << " fb.close();" << endl
+ << indent() << "}" << endl;
}
indent_down();
indent(f_service_) << "}" << endl;
@@ -3397,7 +3413,7 @@
indent(f_service_) << "public void start(I iface, " << argsname
<< " args, org.apache.thrift.async.AsyncMethodCallback<" << resulttype
- << "> resultHandler) throws TException {" << endl;
+ << "> resultHandler) throws org.apache.thrift.TException {" << endl;
indent_up();
// Generate the function call
diff --git a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
index 799e02d..550ebd5 100644
--- a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
+++ b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
@@ -24,7 +24,7 @@
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.AbstractNonblockingServer;
-public abstract class AsyncProcessFunction<I, T, R> {
+public abstract class AsyncProcessFunction<I, T extends TBase, R> {
final String methodName;
public AsyncProcessFunction(String methodName) {
@@ -37,13 +37,13 @@
public abstract T getEmptyArgsInstance();
- public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid);
+ public abstract AsyncMethodCallback<R> getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid);
public String getMethodName() {
return methodName;
}
- public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TBase result, final byte type, final int seqid) throws TException {
+ public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TSerializable result, final byte type, final int seqid) throws TException {
TProtocol oprot = fb.getOutputProtocol();
oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid));
diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
index ed6c323..d0257db 100644
--- a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
+++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
@@ -19,6 +19,7 @@
package org.apache.thrift;
import org.apache.thrift.protocol.*;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.AbstractNonblockingServer.*;
import org.slf4j.Logger;
@@ -63,7 +64,7 @@
}
//Get Args
- TBase args = (TBase)fn.getEmptyArgsInstance();
+ TBase args = fn.getEmptyArgsInstance();
try {
args.read(in);
@@ -81,7 +82,12 @@
//start off processing function
- fn.start(iface, args,fn.getResultHandler(fb,msg.seqid));
+ AsyncMethodCallback resultHandler = fn.getResultHandler(fb,msg.seqid);
+ try {
+ fn.start(iface, args, resultHandler);
+ } catch (Exception e) {
+ resultHandler.onError(e);
+ }
return true;
}
diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
old mode 100755
new mode 100644
diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java
old mode 100755
new mode 100644
diff --git a/lib/java/src/org/apache/thrift/TServiceClient.java b/lib/java/src/org/apache/thrift/TServiceClient.java
index 31153ec..6619b9c 100644
--- a/lib/java/src/org/apache/thrift/TServiceClient.java
+++ b/lib/java/src/org/apache/thrift/TServiceClient.java
@@ -81,8 +81,10 @@
iprot_.readMessageEnd();
throw x;
}
+ System.out.format("Received %d%n", msg.seqid);
if (msg.seqid != seqid_) {
- throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
+ throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID,
+ String.format("%s failed: out of sequence response: expected %d but got %d", methodName, seqid_, msg.seqid));
}
result.read(iprot_);
iprot_.readMessageEnd();
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index 50489a8..3bf1747 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -173,7 +173,7 @@
* select interests without worrying about concurrency.
* @param key
*/
- protected void transition(SelectionKey key) {
+ void transition(SelectionKey key) {
// Ensure key is valid
if (!key.isValid()) {
key.cancel();
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
old mode 100755
new mode 100644
diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
old mode 100755
new mode 100644
diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
old mode 100755
new mode 100644
index 4107801..0e7af5c
--- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java
+++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
@@ -228,12 +228,12 @@
public void testException(String arg) throws Xception, TException {
System.out.print("testException("+arg+")\n");
- if (arg.equals("Xception")) {
+ if ("Xception".equals(arg)) {
Xception x = new Xception();
x.errorCode = 1001;
x.message = arg;
throw x;
- } else if (arg.equals("TException")) {
+ } else if ("TException".equals(arg)) {
throw new TException(arg);
} else {
Xtruct result = new Xtruct();
@@ -416,8 +416,10 @@
testTypedef(testClient);
testNestedMap(testClient);
testInsanity(testClient);
- testOneway(testClient);
testException(testClient);
+ testOneway(testClient);
+ // FIXME: a call after oneway does not work for async client
+ // testI32(testClient);
transport.close();
stopServer();
@@ -553,18 +555,19 @@
}
private void testException(ThriftTest.Client testClient) throws TException, Xception {
- //@TODO testException
- //testClient.testException("no Exception");
- /*try {
- testClient.testException("Xception");
+ try {
+ testClient.testException("Xception");
+ assert false;
} catch(Xception e) {
- assertEquals(e.message, "Xception");
- }*/
- /*try {
- testClient.testException("ApplicationException");
+ assertEquals(e.message, "Xception");
+ assertEquals(e.errorCode, 1001);
+ }
+ try {
+ testClient.testException("TException");
+ assert false;
} catch(TException e) {
- assertEquals(e.message, "ApplicationException");
- }*/
+ }
+ testClient.testException("no Exception");
}
@@ -669,11 +672,21 @@
@Override
public void testException(String arg, AsyncMethodCallback<Void> resultHandler) throws TException {
- try {
- // handler.testException();
- } catch (Exception e) {
-
+ System.out.print("testException("+arg+")\n");
+ if ("Xception".equals(arg)) {
+ Xception x = new Xception();
+ x.errorCode = 1001;
+ x.message = arg;
+ // throw and onError yield the same result.
+ // resultHandler.onError(x);
+ // return;
+ throw x;
+ } else if ("TException".equals(arg)) {
+ // throw new TException(arg);
+ resultHandler.onError(new TException(arg));
+ return;
}
+ resultHandler.onComplete(null);
}
@Override