THRIFT-3680 Java async processor fails to notify errors to clients

This closes #903
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 2613a66..0fa51cd 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -438,8 +438,6 @@
          + "import org.apache.thrift.protocol.TProtocolException;\n"
          + "import org.apache.thrift.EncodingUtils;\n"
          + option
-         + "import org.apache.thrift.TException;\n"
-         + "import org.apache.thrift.async.AsyncMethodCallback;\n"
          + "import org.apache.thrift.server.AbstractNonblockingServer.*;\n"
          + "import java.util.List;\n" + "import java.util.ArrayList;\n" + "import java.util.Map;\n"
          + "import java.util.HashMap;\n" + "import java.util.EnumMap;\n" + "import java.util.Set;\n"
@@ -1366,7 +1364,7 @@
               << tstruct->get_name() << " ";
 
   if (is_exception) {
-    out << "extends TException ";
+    out << "extends org.apache.thrift.TException ";
   }
   out << "implements org.apache.thrift.TBase<" << tstruct->get_name() << ", " << tstruct->get_name()
       << "._Fields>, java.io.Serializable, Cloneable, Comparable<" << tstruct->get_name() << ">";
@@ -3303,11 +3301,12 @@
   indent(f_service_) << "  return new " << argsname << "();" << endl;
   indent(f_service_) << "}" << endl << endl;
 
-  indent(f_service_) << "public AsyncMethodCallback<" << resulttype
+  indent(f_service_) << "public org.apache.thrift.async.AsyncMethodCallback<" << resulttype
                      << "> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {" << endl;
   indent_up();
   indent(f_service_) << "final org.apache.thrift.AsyncProcessFunction fcall = this;" << endl;
-  indent(f_service_) << "return new AsyncMethodCallback<" << resulttype << ">() { " << endl;
+  indent(f_service_) << "return new org.apache.thrift.async.AsyncMethodCallback<" << resulttype
+                     << ">() { " << endl;
   indent_up();
   indent(f_service_) << "public void onComplete(" << resulttype << " o) {" << endl;
 
@@ -3341,48 +3340,65 @@
   indent(f_service_) << "public void onError(Exception e) {" << endl;
   indent_up();
 
-  if (!tfunction->is_oneway()) {
+  if (tfunction->is_oneway()) {
+    f_service_ << indent() << "LOGGER.error(\"Exception inside oneway handler\", e);" << endl;
+  } else {
     indent(f_service_) << "byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;" << endl;
-    indent(f_service_) << "org.apache.thrift.TBase msg;" << endl;
+    indent(f_service_) << "org.apache.thrift.TSerializable msg;" << endl;
     indent(f_service_) << resultname << " result = new " << resultname << "();" << endl;
 
     t_struct* xs = tfunction->get_xceptions();
     const std::vector<t_field*>& xceptions = xs->get_members();
+
     vector<t_field*>::const_iterator x_iter;
     if (xceptions.size() > 0) {
       for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
-        if (x_iter != xceptions.begin())
-          indent(f_service_) << "else ";
-        indent(f_service_) << "if (e instanceof " << type_name((*x_iter)->get_type(), false, false)
-                           << ") {" << endl;
-        indent(f_service_) << indent() << "result." << (*x_iter)->get_name() << " = ("
-                           << type_name((*x_iter)->get_type(), false, false) << ") e;" << endl;
-        indent(f_service_) << indent() << "result.set" << get_cap_name((*x_iter)->get_name())
-                           << get_cap_name("isSet") << "(true);" << endl;
-        indent(f_service_) << indent() << "msg = result;" << endl;
-
-        indent(f_service_) << "}" << endl;
+        if (x_iter == xceptions.begin())
+          f_service_ << indent();
+        string type = type_name((*x_iter)->get_type(), false, false);
+        string name = (*x_iter)->get_name();
+        f_service_ << "if (e instanceof " << type << ") {" << endl;
+        indent_up();
+        f_service_ << indent() << "result." << name << " = (" << type << ") e;" << endl
+                   << indent() << "result.set" << get_cap_name(name) << get_cap_name("isSet")
+                   << "(true);" << endl
+                   << indent() << "msg = result;" << endl;
+        indent_down();
+        indent(f_service_) << "} else ";
       }
-      indent(f_service_) << " else " << endl;
+    } else {
+      indent(f_service_);
     }
-
-    indent(f_service_) << "{" << endl;
+    f_service_ << "if (e instanceof org.apache.thrift.transport.TTransportException) {" << endl;
     indent_up();
-    indent(f_service_) << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl;
-    indent(f_service_) << "msg = (org.apache.thrift.TBase)new "
-                          "org.apache.thrift.TApplicationException(org.apache.thrift."
-                          "TApplicationException.INTERNAL_ERROR, e.getMessage());" << endl;
+    f_service_ << indent() << "LOGGER.error(\"TTransportException inside handler\", e);" << endl
+               << indent() << "fb.close();" << endl
+               << indent() << "return;" << endl;
     indent_down();
-    indent(f_service_) << "}" << endl;
-
-    indent(f_service_) << "try {" << endl;
-    indent(f_service_) << "  fcall.sendResponse(fb,msg,msgType,seqid);" << endl;
-    indent(f_service_) << "  return;" << endl;
-    indent(f_service_) << "} catch (Exception ex) {" << endl;
-    indent(f_service_) << "  LOGGER.error(\"Exception writing to internal frame buffer\", ex);"
+    indent(f_service_) << "} else if (e instanceof org.apache.thrift.TApplicationException) {"
                        << endl;
-    indent(f_service_) << "}" << endl;
-    indent(f_service_) << "fb.close();" << endl;
+    indent_up();
+    f_service_ << indent() << "LOGGER.error(\"TApplicationException inside handler\", e);" << endl
+               << indent() << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl
+               << indent() << "msg = (org.apache.thrift.TApplicationException)e;" << endl;
+    indent_down();
+    indent(f_service_) << "} else {" << endl;
+    indent_up();
+    f_service_ << indent() << "LOGGER.error(\"Exception inside handler\", e);" << endl
+               << indent() << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;" << endl
+               << indent() << "msg = new "
+                              "org.apache.thrift.TApplicationException(org.apache.thrift."
+                              "TApplicationException.INTERNAL_ERROR, e.getMessage());"
+               << endl;
+    indent_down();
+    f_service_ << indent() << "}" << endl
+               << indent() << "try {" << endl
+               << indent() << "  fcall.sendResponse(fb,msg,msgType,seqid);" << endl
+               << indent() << "} catch (Exception ex) {" << endl
+               << indent() << "  LOGGER.error(\"Exception writing to internal frame buffer\", ex);"
+               << endl
+               << indent() << "  fb.close();" << endl
+               << indent() << "}" << endl;
   }
   indent_down();
   indent(f_service_) << "}" << endl;
@@ -3397,7 +3413,7 @@
 
   indent(f_service_) << "public void start(I iface, " << argsname
                      << " args, org.apache.thrift.async.AsyncMethodCallback<" << resulttype
-                     << "> resultHandler) throws TException {" << endl;
+                     << "> resultHandler) throws org.apache.thrift.TException {" << endl;
   indent_up();
 
   // Generate the function call
diff --git a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
index 799e02d..550ebd5 100644
--- a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
+++ b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
@@ -24,7 +24,7 @@
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.AbstractNonblockingServer;
 
-public abstract class AsyncProcessFunction<I, T, R> {
+public abstract class AsyncProcessFunction<I, T extends TBase, R> {
     final String methodName;
 
     public AsyncProcessFunction(String methodName) {
@@ -37,13 +37,13 @@
 
     public abstract T getEmptyArgsInstance();
 
-    public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid);
+    public abstract AsyncMethodCallback<R> getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid);
 
     public String getMethodName() {
         return methodName;
     }
 
-    public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TBase result, final byte type, final int seqid) throws TException {
+    public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TSerializable result, final byte type, final int seqid) throws TException {
         TProtocol oprot = fb.getOutputProtocol();
 
         oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid));
diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
index ed6c323..d0257db 100644
--- a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
+++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.thrift;
 
 import org.apache.thrift.protocol.*;
+import org.apache.thrift.async.AsyncMethodCallback;
 
 import org.apache.thrift.server.AbstractNonblockingServer.*;
 import org.slf4j.Logger;
@@ -63,7 +64,7 @@
         }
 
         //Get Args
-        TBase args = (TBase)fn.getEmptyArgsInstance();
+        TBase args = fn.getEmptyArgsInstance();
 
         try {
             args.read(in);
@@ -81,7 +82,12 @@
 
 
         //start off processing function
-        fn.start(iface, args,fn.getResultHandler(fb,msg.seqid));
+        AsyncMethodCallback resultHandler = fn.getResultHandler(fb,msg.seqid);
+        try {
+          fn.start(iface, args, resultHandler);
+        } catch (Exception e) {
+          resultHandler.onError(e);
+        }
         return true;
     }
 
diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
old mode 100755
new mode 100644
diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchStats.java
old mode 100755
new mode 100644
diff --git a/lib/java/src/org/apache/thrift/TServiceClient.java b/lib/java/src/org/apache/thrift/TServiceClient.java
index 31153ec..6619b9c 100644
--- a/lib/java/src/org/apache/thrift/TServiceClient.java
+++ b/lib/java/src/org/apache/thrift/TServiceClient.java
@@ -81,8 +81,10 @@
       iprot_.readMessageEnd();
       throw x;
     }
+    System.out.format("Received %d%n", msg.seqid);
     if (msg.seqid != seqid_) {
-      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
+      throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID,
+          String.format("%s failed: out of sequence response: expected %d but got %d", methodName, seqid_, msg.seqid));
     }
     result.read(iprot_);
     iprot_.readMessageEnd();
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index 50489a8..3bf1747 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -173,7 +173,7 @@
    * select interests without worrying about concurrency.
    * @param key
    */
-  protected void transition(SelectionKey key) {
+  void transition(SelectionKey key) {
     // Ensure key is valid
     if (!key.isValid()) {
       key.cancel();
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
old mode 100755
new mode 100644
diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
old mode 100755
new mode 100644
diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
old mode 100755
new mode 100644
index 4107801..0e7af5c
--- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java
+++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
@@ -228,12 +228,12 @@
   
     public void testException(String arg) throws Xception, TException {
       System.out.print("testException("+arg+")\n");
-      if (arg.equals("Xception")) {
+      if ("Xception".equals(arg)) {
         Xception x = new Xception();
         x.errorCode = 1001;
         x.message = arg;
         throw x;
-      } else if (arg.equals("TException")) {
+      } else if ("TException".equals(arg)) {
         throw new TException(arg);
       } else {
         Xtruct result = new Xtruct();
@@ -416,8 +416,10 @@
       testTypedef(testClient);
       testNestedMap(testClient);
       testInsanity(testClient);
-      testOneway(testClient);
       testException(testClient);
+      testOneway(testClient);
+      // FIXME: a call after oneway does not work for async client
+      // testI32(testClient);
       transport.close();
 
       stopServer();
@@ -553,18 +555,19 @@
   }
 
   private void testException(ThriftTest.Client testClient) throws TException, Xception {
-    //@TODO testException
-    //testClient.testException("no Exception");
-    /*try {
-        testClient.testException("Xception");
+    try {
+      testClient.testException("Xception");
+      assert false;
     } catch(Xception e) {
-    	assertEquals(e.message, "Xception");
-    }*/
-    /*try {
-        testClient.testException("ApplicationException");
+      assertEquals(e.message, "Xception");
+      assertEquals(e.errorCode, 1001);
+    }
+    try {
+      testClient.testException("TException");
+      assert false;
     } catch(TException e) {
-    	assertEquals(e.message, "ApplicationException");
-    }*/
+    }
+    testClient.testException("no Exception");
   }
 
 
@@ -669,11 +672,21 @@
 
     @Override
     public void testException(String arg, AsyncMethodCallback<Void> resultHandler) throws TException {
-      try {
-        // handler.testException();
-      } catch (Exception e) {
-
+      System.out.print("testException("+arg+")\n");
+      if ("Xception".equals(arg)) {
+        Xception x = new Xception();
+        x.errorCode = 1001;
+        x.message = arg;
+        // throw and onError yield the same result.
+        // resultHandler.onError(x);
+        // return;
+        throw x;
+      } else if ("TException".equals(arg)) {
+        // throw new TException(arg);
+        resultHandler.onError(new TException(arg));
+        return;
       }
+      resultHandler.onComplete(null);
     }
 
     @Override