Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/thrift
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 2985a78..c9a3e56 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -144,7 +144,10 @@
   void generate_service_client    (t_service* tservice);
   void generate_service_async_client(t_service* tservice);
   void generate_service_server    (t_service* tservice);
+  void generate_service_async_server    (t_service* tservice);
   void generate_process_function  (t_service* tservice, t_function* tfunction);
+  void generate_process_async_function  (t_service* tservice, t_function* tfunction);
+
 
   void generate_java_union(t_struct* tstruct);
   void generate_union_constructor(ofstream& out, t_struct* tstruct);
@@ -373,6 +376,8 @@
     "import org.apache.thrift.protocol.TProtocolException;\n" +
     "import org.apache.thrift.EncodingUtils;\n" +
     "import org.apache.thrift.TException;\n" +
+    "import org.apache.thrift.async.AsyncMethodCallback;\n"+
+    "import org.apache.thrift.server.AbstractNonblockingServer.*;\n"+
     "import java.util.List;\n" +
     "import java.util.ArrayList;\n" +
     "import java.util.Map;\n" +
@@ -2225,6 +2230,7 @@
   generate_service_client(tservice);
   generate_service_async_client(tservice);
   generate_service_server(tservice);
+  generate_service_async_server(tservice);
   generate_service_helpers(tservice);
 
   indent_down();
@@ -2279,6 +2285,7 @@
   f_service_ << indent() << "}" << endl << endl;
 }
 
+
 /**
  * Generates structs for all the service args and return types
  *
@@ -2641,6 +2648,61 @@
 }
 
 /**
+ * Generates a service server definition.
+ *
+ * @param tservice The service to generate a server for.
+ */
+void t_java_generator::generate_service_async_server(t_service* tservice) {
+  // Generate the dispatch methods
+  vector<t_function*> functions = tservice->get_functions();
+  vector<t_function*>::iterator f_iter;
+
+  // Extends stuff
+  string extends = "";
+  string extends_processor = "";
+  if (tservice->get_extends() == NULL) {
+    extends_processor = "org.apache.thrift.TBaseAsyncProcessor<I>";
+  } else {
+    extends = type_name(tservice->get_extends());
+    extends_processor = extends + ".AsyncProcessor<I>";
+  }
+
+  // Generate the header portion
+  indent(f_service_) <<
+    "public static class AsyncProcessor<I extends AsyncIface> extends " << extends_processor << " {" << endl;
+  indent_up();
+
+  indent(f_service_) << "private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());" << endl;
+
+  indent(f_service_) << "public AsyncProcessor(I iface) {" << endl;
+  indent(f_service_) << "  super(iface, getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));" << endl;
+  indent(f_service_) << "}" << endl << endl;
+
+  indent(f_service_) << "protected AsyncProcessor(I iface, Map<String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, ?>> processMap) {" << endl;
+  indent(f_service_) << "  super(iface, getProcessMap(processMap));" << endl;
+  indent(f_service_) << "}" << endl << endl;
+
+
+  indent(f_service_) << "private static <I extends AsyncIface> Map<String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase,?>> getProcessMap(Map<String,  org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, ?>> processMap) {" << endl;
+  indent_up();
+  for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    indent(f_service_) << "processMap.put(\"" << (*f_iter)->get_name() << "\", new " << (*f_iter)->get_name() << "());" << endl;
+  }
+  indent(f_service_) << "return processMap;" << endl;
+  indent_down();
+  indent(f_service_) << "}" << endl << endl;
+
+  // Generate the process subfunctions
+  for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    generate_process_async_function(tservice, *f_iter);
+  }
+
+  indent_down();
+  indent(f_service_) << "}" << endl << endl;
+}
+
+
+/**
  * Generates a struct and helpers for a function.
  *
  * @param tfunction The function
@@ -2666,6 +2728,159 @@
   generate_java_struct_definition(f_service_, &result, false, true, true);
 }
 
+
+
+/**
+ * Generates a process function definition.
+ *
+ * @param tfunction The function to write a dispatcher for
+ */
+void t_java_generator::generate_process_async_function(t_service* tservice,
+                                                 t_function* tfunction) {
+  string argsname = tfunction->get_name() + "_args";
+
+  string resultname = tfunction->get_name() + "_result";
+    if (tfunction->is_oneway()) {
+      resultname = "org.apache.thrift.TBase";
+    }
+
+  string resulttype = type_name(tfunction->get_returntype(),true);
+
+
+  (void) tservice;
+  // Open class
+  indent(f_service_) <<
+    "public static class " << tfunction->get_name() << "<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, " << argsname << ", "<<resulttype<<"> {" << endl;
+  indent_up();
+
+  indent(f_service_) << "public " << tfunction->get_name() << "() {" << endl;
+  indent(f_service_) << "  super(\"" << tfunction->get_name() << "\");" << endl;
+  indent(f_service_) << "}" << endl << endl;
+
+  indent(f_service_) << "public " << argsname << " getEmptyArgsInstance() {" << endl;
+  indent(f_service_) << "  return new " << argsname << "();" << endl;
+  indent(f_service_) << "}" << endl << endl;
+
+  indent(f_service_) << "public AsyncMethodCallback<"<<resulttype<<"> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {" << endl;
+  indent_up();
+  indent(f_service_) << "final org.apache.thrift.AsyncProcessFunction fcall = this;"<<endl;
+  indent(f_service_) << "return new AsyncMethodCallback<"<<resulttype<<">() { " << endl;
+  indent_up();
+  indent(f_service_) <<	"public void onComplete(" << resulttype <<" o) {" << endl;
+
+  indent_up();
+  if (!tfunction->is_oneway()) {
+    indent(f_service_) <<resultname<<" result = new "<<resultname<<"();"<<endl;
+
+    if (!tfunction->get_returntype()->is_void()) {
+  	  indent(f_service_) << "result.success = o;"<<endl;
+      // Set isset on success field
+  	  if (!type_can_be_null(tfunction->get_returntype())) {
+  	    indent(f_service_) << "result.set" << get_cap_name("success") << get_cap_name("isSet") << "(true);" << endl;
+  	  }
+    }
+
+    indent(f_service_) << "try {"<<endl;
+    indent(f_service_) << "  fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);"<<endl;
+    indent(f_service_) << "  return;"<<endl;
+    indent(f_service_) << "} catch (Exception e) {"<<endl;
+    indent(f_service_) << "  LOGGER.error(\"Exception writing to internal frame buffer\", e);"<<endl;
+    indent(f_service_) << "}"<<endl;
+    indent(f_service_) << "fb.close();"<<endl;
+  }
+  indent_down();
+  indent(f_service_) << "}" <<endl;
+
+  indent(f_service_) << "public void onError(Exception e) {"<<endl;
+  indent_up();
+
+
+
+  if (!tfunction->is_oneway()) {
+	 indent(f_service_) <<"byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;"<<endl;
+	 indent(f_service_) <<"org.apache.thrift.TBase msg;"<<endl;
+     indent(f_service_) <<resultname<<" result = new "<<resultname<<"();"<<endl;
+
+     t_struct* xs = tfunction->get_xceptions();
+     const std::vector<t_field*>& xceptions = xs->get_members();
+     vector<t_field*>::const_iterator x_iter;
+     bool first = true;
+     if (xceptions.size() > 0) {
+    	 for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+    		 first ? first = false : indent(f_service_) << "else ";
+    		 indent(f_service_) << "if (e instanceof " << type_name((*x_iter)->get_type(), false, false)<<") {" << endl;
+    		 indent(f_service_) << indent() << "result." << (*x_iter)->get_name() << " = (" << type_name((*x_iter)->get_type(), false, false) << ") e;" << endl;
+    	  	 indent(f_service_) << indent() << "result.set" << get_cap_name((*x_iter)->get_name()) << get_cap_name("isSet") << "(true);" << endl;
+    		 indent(f_service_) << indent() << "msg = result;"<<endl;
+
+    	  	 indent(f_service_) << "}"<<endl;
+    	 }
+    	 indent(f_service_) << " else "<<endl;
+     }
+
+     indent(f_service_) << "{"<<endl;
+     indent_up();
+     indent(f_service_) << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;"<<endl;
+     indent(f_service_) << "msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());"<<endl;
+     indent_down();
+     indent(f_service_) << "}"<<endl;
+
+
+     indent(f_service_) << "try {"<<endl;
+     indent(f_service_) << "  fcall.sendResponse(fb,msg,msgType,seqid);"<<endl;
+     indent(f_service_) << "  return;"<<endl;
+     indent(f_service_) << "} catch (Exception ex) {"<<endl;
+     indent(f_service_) << "  LOGGER.error(\"Exception writing to internal frame buffer\", ex);"<<endl;
+     indent(f_service_) << "}"<<endl;
+     indent(f_service_) << "fb.close();"<<endl;
+  }
+  indent_down();
+  indent(f_service_) << "}" <<endl;
+  indent_down();
+  indent(f_service_) << "};" <<endl;
+  indent_down();
+  indent(f_service_) << "}" << endl << endl;
+
+  indent(f_service_) << "protected boolean isOneway() {" << endl;
+  indent(f_service_) << "  return " << ((tfunction->is_oneway())?"true":"false") << ";" << endl;
+  indent(f_service_) << "}" << endl << endl;
+
+  indent(f_service_) << "public void start(I iface, " << argsname << " args, org.apache.thrift.async.AsyncMethodCallback<"<<resulttype<<"> resultHandler) throws TException {" << endl;
+  indent_up();
+
+  // Generate the function call
+  t_struct* arg_struct = tfunction->get_arglist();
+  const std::vector<t_field*>& fields = arg_struct->get_members();
+  vector<t_field*>::const_iterator f_iter;
+  f_service_ << indent();
+
+  f_service_ << "iface." << tfunction->get_name() << "(";
+  bool first = true;
+  for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+    if (first) {
+      first = false;
+    } else {
+      f_service_ << ", ";
+    }
+    f_service_ << "args." << (*f_iter)->get_name();
+  }
+  if (!first)
+	  f_service_ << ",";
+  f_service_ << "resultHandler";
+  f_service_ << ");" << endl;
+
+
+  indent_down();
+  indent(f_service_) << "}";
+
+  // Close function
+  f_service_ << endl;
+
+  // Close class
+  indent_down();
+  f_service_ << indent() << "}" << endl << endl;
+}
+
 /**
  * Generates a process function definition.
  *
@@ -2773,6 +2988,7 @@
   f_service_ << indent() << "}" << endl << endl;
 }
 
+
 /**
  * Deserializes a field of any type.
  *
@@ -3270,7 +3486,7 @@
 
   switch (tbase) {
     case t_base_type::TYPE_VOID:
-      return "void";
+      return (in_container ? "Void" : "void");
     case t_base_type::TYPE_STRING:
       if (type->is_binary()) {
         return "ByteBuffer";
@@ -3395,14 +3611,8 @@
     arglist = argument_list(tfunc->get_arglist(), include_types) + ", ";
   }
 
-  std::string ret_type = "";
-  if (use_base_method) {
-    ret_type += "AsyncClient.";
-  }
-  ret_type += tfunc->get_name() + "_call";
-
   if (include_types) {
-    arglist += "org.apache.thrift.async.AsyncMethodCallback<" + ret_type + "> ";
+    arglist += "org.apache.thrift.async.AsyncMethodCallback ";
   }
   arglist += "resultHandler";
 
@@ -3453,7 +3663,7 @@
     result += ", ";
   }
   if (include_types) {
-    result += "org.apache.thrift.async.AsyncMethodCallback<" + tfunct->get_name() + "_call" + "> ";
+    result += "org.apache.thrift.async.AsyncMethodCallback ";
   }
   result += "resultHandler";
   return result;
diff --git a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
new file mode 100644
index 0000000..799e02d
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TMessageType;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.AbstractNonblockingServer;
+
+public abstract class AsyncProcessFunction<I, T, R> {
+    final String methodName;
+
+    public AsyncProcessFunction(String methodName) {
+        this.methodName = methodName;
+    }
+
+    protected abstract boolean isOneway();
+
+    public abstract void start(I iface, T args, AsyncMethodCallback<R> resultHandler) throws TException;
+
+    public abstract T getEmptyArgsInstance();
+
+    public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid);
+
+    public String getMethodName() {
+        return methodName;
+    }
+
+    public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TBase result, final byte type, final int seqid) throws TException {
+        TProtocol oprot = fb.getOutputProtocol();
+
+        oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid));
+        result.write(oprot);
+        oprot.writeMessageEnd();
+        oprot.getTransport().flush();
+
+        fb.responseReady();
+    }
+}
diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
new file mode 100644
index 0000000..da41620
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift;
+
+import org.apache.thrift.protocol.*;
+
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class TBaseAsyncProcessor<I> implements TProcessor {
+    protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
+    final I iface;
+    final Map<String,AsyncProcessFunction<I, ? extends TBase,?>> processMap;
+
+    public TBaseAsyncProcessor(I iface, Map<String, AsyncProcessFunction<I, ? extends TBase,?>> processMap) {
+        this.iface = iface;
+        this.processMap = processMap;
+    }
+
+    public Map<String,AsyncProcessFunction<I, ? extends TBase,?>> getProcessMapView() {
+        return Collections.unmodifiableMap(processMap);
+    }
+
+    public boolean process(final AsyncFrameBuffer fb) throws TException {
+
+        final TProtocol in = fb.getInputProtocol();
+        final TProtocol out = fb.getOutputProtocol();
+
+        //Find processing function
+        final TMessage msg = in.readMessageBegin();
+        AsyncProcessFunction fn = processMap.get(msg.name);
+        if (fn == null) {
+            TProtocolUtil.skip(in, TType.STRUCT);
+            in.readMessageEnd();
+            TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
+            out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
+            x.write(out);
+            out.writeMessageEnd();
+            out.getTransport().flush();
+            fb.responseReady();
+            return true;
+        }
+
+        //Get Args
+        TBase args = (TBase)fn.getEmptyArgsInstance();
+
+        try {
+            args.read(in);
+        } catch (TProtocolException e) {
+            in.readMessageEnd();
+            TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+            out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
+            x.write(out);
+            out.writeMessageEnd();
+            out.getTransport().flush();
+            fb.responseReady();
+            return true;
+        }
+        in.readMessageEnd();
+
+
+        //start off processing function
+        fn.start(iface, args,fn.getResultHandler(fb,msg.seqid));
+        return true;
+    }
+
+    @Override
+    public boolean process(TProtocol in, TProtocol out) throws TException {
+        return false;
+    }
+}
diff --git a/lib/java/src/org/apache/thrift/TProcessorFactory.java b/lib/java/src/org/apache/thrift/TProcessorFactory.java
index bcd8a38..f6dfb14 100644
--- a/lib/java/src/org/apache/thrift/TProcessorFactory.java
+++ b/lib/java/src/org/apache/thrift/TProcessorFactory.java
@@ -36,4 +36,8 @@
   public TProcessor getProcessor(TTransport trans) {
     return processor_;
   }
+
+  public boolean isAsyncProcessor() {
+      return processor_ instanceof TBaseAsyncProcessor;
+  }
 }
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 97afc0b..80da6ca 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -28,6 +28,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.thrift.TBaseAsyncProcessor;
 import org.apache.thrift.TByteArrayOutputStream;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocol;
@@ -62,12 +63,12 @@
    * time. Without this limit, the server will gladly allocate client buffers
    * right into an out of memory exception, rather than waiting.
    */
-  private final long MAX_READ_BUFFER_BYTES;
+  final long MAX_READ_BUFFER_BYTES;
 
   /**
    * How many bytes are currently allocated to read buffers.
    */
-  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+  final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
 
   public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
     super(args);
@@ -265,40 +266,42 @@
    * response data back to the client. In the process it manages flipping the
    * read and write bits on the selection key for its client.
    */
-  protected class FrameBuffer {
+   public class FrameBuffer {
+    private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
     // the actual transport hooked up to the client.
-    public final TNonblockingTransport trans_;
+    protected final TNonblockingTransport trans_;
 
     // the SelectionKey that corresponds to our transport
-    private final SelectionKey selectionKey_;
+    protected final SelectionKey selectionKey_;
 
     // the SelectThread that owns the registration of our transport
-    private final AbstractSelectThread selectThread_;
+    protected final AbstractSelectThread selectThread_;
 
     // where in the process of reading/writing are we?
-    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
+    protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
 
     // the ByteBuffer we'll be using to write and read, depending on the state
-    private ByteBuffer buffer_;
+    protected ByteBuffer buffer_;
 
-    private final TByteArrayOutputStream response_;
+    protected final TByteArrayOutputStream response_;
     
     // the frame that the TTransport should wrap.
-    private final TMemoryInputTransport frameTrans_;
+    protected final TMemoryInputTransport frameTrans_;
     
     // the transport that should be used to connect to clients
-    private final TTransport inTrans_;
+    protected final TTransport inTrans_;
     
-    private final TTransport outTrans_;
+    protected final TTransport outTrans_;
     
     // the input protocol to use on frames
-    private final TProtocol inProt_;
+    protected final TProtocol inProt_;
     
     // the output protocol to use on frames
-    private final TProtocol outProt_;
+    protected final TProtocol outProt_;
     
     // context associated with this connection
-    private final ServerContext context_;
+    protected final ServerContext context_;
 
     public FrameBuffer(final TNonblockingTransport trans,
         final SelectionKey selectionKey,
@@ -561,7 +564,7 @@
      * current thread is this FrameBuffer's select thread, then it just does the
      * interest change immediately.
      */
-    private void requestSelectInterestChange() {
+    protected void requestSelectInterestChange() {
       if (Thread.currentThread() == this.selectThread_) {
         changeSelectInterests();
       } else {
@@ -569,4 +572,39 @@
       }
     }
   } // FrameBuffer
+
+  public class AsyncFrameBuffer extends FrameBuffer {
+    public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+    }
+
+    public TProtocol getInputProtocol() {
+      return  inProt_;
+    }
+
+    public TProtocol getOutputProtocol() {
+      return outProt_;
+    }
+
+
+    public void invoke() {
+      frameTrans_.reset(buffer_.array());
+      response_.reset();
+
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.processContext(context_, inTrans_, outTrans_);
+        }
+        ((TBaseAsyncProcessor)processorFactory_.getProcessor(inTrans_)).process(this);
+        return;
+      } catch (TException te) {
+        LOGGER.warn("Exception while invoking!", te);
+      } catch (Throwable t) {
+        LOGGER.error("Unexpected throwable while invoking!", t);
+      }
+      // This will only be reached when there is a throwable.
+      state_ = FrameBufferState.AWAITING_CLOSE;
+      requestSelectInterestChange();
+    }
+  }
 }
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 240b123..a6e7476 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -224,9 +224,11 @@
         clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
 
         // add this key to the map
-        FrameBuffer frameBuffer = new FrameBuffer(client, clientKey,
-          SelectAcceptThread.this);
-        clientKey.attach(frameBuffer);
+          FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+                  new AsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+                  new FrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+          clientKey.attach(frameBuffer);
       } catch (TTransportException tte) {
         // something went wrong accepting.
         LOGGER.warn("Exception trying to accept!", tte);
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 29eabb1..8a68632 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -606,7 +606,10 @@
       try {
         clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
 
-        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+        FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+                new AsyncFrameBuffer(accepted, clientKey, SelectorThread.this) :
+                new FrameBuffer(accepted, clientKey, SelectorThread.this);
+
         clientKey.attach(frameBuffer);
       } catch (IOException e) {
         LOGGER.warn("Failed to register accepted connection to selector!", e);
diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
index 209038b..4cbb511 100755
--- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java
+++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
@@ -30,6 +30,7 @@
 
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
@@ -374,11 +375,14 @@
     System.out.print("}\n");
   }
 
+  public boolean useAsyncProcessor() {
+      return false;
+  }
+
   public void testIt() throws Exception {
 
     for (TProtocolFactory protoFactory : getProtocols()) {
-      TestHandler handler = new TestHandler();
-      ThriftTest.Processor processor = new ThriftTest.Processor(handler);
+      TProcessor processor = useAsyncProcessor() ? new ThriftTest.AsyncProcessor(new AsyncTestHandler()) : new ThriftTest.Processor(new TestHandler());
 
       startServer(processor, protoFactory);
 
@@ -557,4 +561,115 @@
     }*/
   }
 
+
+    public static class AsyncTestHandler implements ThriftTest.AsyncIface {
+
+        TestHandler handler = new TestHandler();
+
+        @Override
+        public void testVoid(AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(null);
+        }
+
+        @Override
+        public void testString(String thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testString(thing));
+        }
+
+        @Override
+        public void testByte(byte thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testByte(thing));
+        }
+
+        @Override
+        public void testI32(int thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testI32(thing));
+        }
+
+        @Override
+        public void testI64(long thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testI64(thing));
+        }
+
+        @Override
+        public void testDouble(double thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testDouble(thing));
+        }
+
+        @Override
+        public void testStruct(Xtruct thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testStruct(thing));
+        }
+
+        @Override
+        public void testNest(Xtruct2 thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testNest(thing));
+        }
+
+        @Override
+        public void testMap(Map<Integer, Integer> thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testMap(thing));
+        }
+
+        @Override
+        public void testStringMap(Map<String, String> thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testStringMap(thing));
+        }
+
+        @Override
+        public void testSet(Set<Integer> thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testSet(thing));
+        }
+
+        @Override
+        public void testList(List<Integer> thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testList(thing));
+        }
+
+        @Override
+        public void testEnum(Numberz thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testEnum(thing));
+        }
+
+        @Override
+        public void testTypedef(long thing, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testTypedef(thing));
+        }
+
+        @Override
+        public void testMapMap(int hello, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testMapMap(hello));
+        }
+
+        @Override
+        public void testInsanity(Insanity argument, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testInsanity(argument));
+        }
+
+        @Override
+        public void testMulti(byte arg0, int arg1, long arg2, Map<Short, String> arg3, Numberz arg4, long arg5, AsyncMethodCallback resultHandler) throws TException {
+            resultHandler.onComplete(handler.testMulti(arg0,arg1,arg2,arg3,arg4,arg5));
+        }
+
+        @Override
+        public void testException(String arg, AsyncMethodCallback resultHandler) throws TException {
+            try {
+               // handler.testException();
+            } catch (Exception e) {
+
+            }
+        }
+
+        @Override
+        public void testMultiException(String arg0, String arg1, AsyncMethodCallback resultHandler) throws TException {
+            //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+        @Override
+        public void testOneway(int secondsToSleep, AsyncMethodCallback resultHandler) throws TException {
+            handler.testOneway(secondsToSleep);
+            resultHandler.onComplete(null);
+        }
+    }
+
 }
diff --git a/lib/java/test/org/apache/thrift/server/TestAsyncServer.java b/lib/java/test/org/apache/thrift/server/TestAsyncServer.java
new file mode 100644
index 0000000..29c54cb
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/server/TestAsyncServer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.server;
+
+public class TestAsyncServer extends TestNonblockingServer {
+
+   @Override
+   public boolean useAsyncProcessor(){
+       return true;
+   }
+
+}