THRIFT-3449 TBaseAsyncProcessor fb.responseReady() never called for oneway functions
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 0fa51cd..afcc832 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -3325,14 +3325,22 @@
indent(f_service_) << "try {" << endl;
indent(f_service_)
- << " fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);"
+ << " fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);"
<< endl;
- indent(f_service_) << " return;" << endl;
+ indent(f_service_) << "} catch (org.apache.thrift.transport.TTransportException e) {" << endl;
+ indent_up();
+ f_service_ << indent()
+ << "LOGGER.error(\"TTransportException writing to internal frame buffer\", e);"
+ << endl
+ << indent() << "fb.close();" << endl;
+ indent_down();
indent(f_service_) << "} catch (Exception e) {" << endl;
- indent(f_service_) << " LOGGER.error(\"Exception writing to internal frame buffer\", e);"
- << endl;
+ indent_up();
+ f_service_ << indent() << "LOGGER.error(\"Exception writing to internal frame buffer\", e);"
+ << endl
+ << indent() << "onError(e);" << endl;
+ indent_down();
indent(f_service_) << "}" << endl;
- indent(f_service_) << "fb.close();" << endl;
}
indent_down();
indent(f_service_) << "}" << endl;
@@ -3341,7 +3349,21 @@
indent_up();
if (tfunction->is_oneway()) {
+ indent(f_service_) << "if (e instanceof org.apache.thrift.transport.TTransportException) {"
+ << endl;
+ indent_up();
+
+ f_service_ << indent() << "LOGGER.error(\"TTransportException inside handler\", e);" << endl
+ << indent() << "fb.close();" << endl;
+
+ indent_down();
+ indent(f_service_) << "} else {" << endl;
+ indent_up();
+
f_service_ << indent() << "LOGGER.error(\"Exception inside oneway handler\", e);" << endl;
+
+ indent_down();
+ indent(f_service_) << "}" << endl;
} else {
indent(f_service_) << "byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;" << endl;
indent(f_service_) << "org.apache.thrift.TSerializable msg;" << endl;
diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
index d0257db..9459c1a 100644
--- a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
+++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
@@ -54,11 +54,13 @@
if (fn == null) {
TProtocolUtil.skip(in, TType.STRUCT);
in.readMessageEnd();
- TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
- out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
- x.write(out);
- out.writeMessageEnd();
- out.getTransport().flush();
+ if (!fn.isOneway()) {
+ TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
+ out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
+ x.write(out);
+ out.writeMessageEnd();
+ out.getTransport().flush();
+ }
fb.responseReady();
return true;
}
@@ -70,19 +72,24 @@
args.read(in);
} catch (TProtocolException e) {
in.readMessageEnd();
- TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
- out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
- x.write(out);
- out.writeMessageEnd();
- out.getTransport().flush();
+ if (!fn.isOneway()) {
+ TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
+ x.write(out);
+ out.writeMessageEnd();
+ out.getTransport().flush();
+ }
fb.responseReady();
return true;
}
in.readMessageEnd();
+ if (fn.isOneway()) {
+ fb.responseReady();
+ }
//start off processing function
- AsyncMethodCallback resultHandler = fn.getResultHandler(fb,msg.seqid);
+ AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid);
try {
fn.start(iface, args, resultHandler);
} catch (Exception e) {
diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
index 0e7af5c..c2d2952 100644
--- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java
+++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
@@ -281,7 +281,7 @@
public static final String HOST = "localhost";
public static final int PORT = Integer.valueOf(
System.getProperty("test.port", "9090"));
- protected static final int SOCKET_TIMEOUT = 1000;
+ protected static final int SOCKET_TIMEOUT = 1500;
private static final Xtruct XSTRUCT = new Xtruct("Zero", (byte) 1, -3, -5);
private static final Xtruct2 XSTRUCT2 = new Xtruct2((byte)1, XSTRUCT, 5);
@@ -418,8 +418,7 @@
testInsanity(testClient);
testException(testClient);
testOneway(testClient);
- // FIXME: a call after oneway does not work for async client
- // testI32(testClient);
+ testI32(testClient);
transport.close();
stopServer();
@@ -486,7 +485,10 @@
}
private void testOneway(ThriftTest.Client testClient) throws Exception {
- testClient.testOneway(3);
+ long begin = System.currentTimeMillis();
+ testClient.testOneway(1);
+ long elapsed = System.currentTimeMillis() - begin;
+ assertTrue(elapsed < 500);
}
private void testSet(ThriftTest.Client testClient) throws TException {
@@ -531,21 +533,20 @@
}
public void testTransportFactory() throws Exception {
-
for (TProtocolFactory protoFactory : getProtocols()) {
TestHandler handler = new TestHandler();
ThriftTest.Processor processor = new ThriftTest.Processor(handler);
-
+
final CallCountingTransportFactory factory = new CallCountingTransportFactory(new TFramedTransport.Factory());
-
+
startServer(processor, protoFactory, factory);
assertEquals(0, factory.count);
-
+
TSocket socket = new TSocket(HOST, PORT);
socket.setTimeout(SOCKET_TIMEOUT);
TTransport transport = getClientTransport(socket);
open(transport);
-
+
TProtocol protocol = protoFactory.getProtocol(transport);
ThriftTest.Client testClient = new ThriftTest.Client(protocol);
assertEquals(0, testClient.testByte((byte) 0));