Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/thrift
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 2985a78..c9a3e56 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -144,7 +144,10 @@
void generate_service_client (t_service* tservice);
void generate_service_async_client(t_service* tservice);
void generate_service_server (t_service* tservice);
+ void generate_service_async_server (t_service* tservice);
void generate_process_function (t_service* tservice, t_function* tfunction);
+ void generate_process_async_function (t_service* tservice, t_function* tfunction);
+
void generate_java_union(t_struct* tstruct);
void generate_union_constructor(ofstream& out, t_struct* tstruct);
@@ -373,6 +376,8 @@
"import org.apache.thrift.protocol.TProtocolException;\n" +
"import org.apache.thrift.EncodingUtils;\n" +
"import org.apache.thrift.TException;\n" +
+ "import org.apache.thrift.async.AsyncMethodCallback;\n"+
+ "import org.apache.thrift.server.AbstractNonblockingServer.*;\n"+
"import java.util.List;\n" +
"import java.util.ArrayList;\n" +
"import java.util.Map;\n" +
@@ -2225,6 +2230,7 @@
generate_service_client(tservice);
generate_service_async_client(tservice);
generate_service_server(tservice);
+ generate_service_async_server(tservice);
generate_service_helpers(tservice);
indent_down();
@@ -2279,6 +2285,7 @@
f_service_ << indent() << "}" << endl << endl;
}
+
/**
* Generates structs for all the service args and return types
*
@@ -2641,6 +2648,61 @@
}
/**
+ * Generates a service server definition.
+ *
+ * @param tservice The service to generate a server for.
+ */
+void t_java_generator::generate_service_async_server(t_service* tservice) {
+ // Generate the dispatch methods
+ vector<t_function*> functions = tservice->get_functions();
+ vector<t_function*>::iterator f_iter;
+
+ // Extends stuff
+ string extends = "";
+ string extends_processor = "";
+ if (tservice->get_extends() == NULL) {
+ extends_processor = "org.apache.thrift.TBaseAsyncProcessor<I>";
+ } else {
+ extends = type_name(tservice->get_extends());
+ extends_processor = extends + ".AsyncProcessor<I>";
+ }
+
+ // Generate the header portion
+ indent(f_service_) <<
+ "public static class AsyncProcessor<I extends AsyncIface> extends " << extends_processor << " {" << endl;
+ indent_up();
+
+ indent(f_service_) << "private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());" << endl;
+
+ indent(f_service_) << "public AsyncProcessor(I iface) {" << endl;
+ indent(f_service_) << " super(iface, getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));" << endl;
+ indent(f_service_) << "}" << endl << endl;
+
+ indent(f_service_) << "protected AsyncProcessor(I iface, Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {" << endl;
+ indent(f_service_) << " super(iface, getProcessMap(processMap));" << endl;
+ indent(f_service_) << "}" << endl << endl;
+
+
+ indent(f_service_) << "private static <I extends AsyncIface> Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {" << endl;
+ indent_up();
+ for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ indent(f_service_) << "processMap.put(\"" << (*f_iter)->get_name() << "\", new " << (*f_iter)->get_name() << "());" << endl;
+ }
+ indent(f_service_) << "return processMap;" << endl;
+ indent_down();
+ indent(f_service_) << "}" << endl << endl;
+
+ // Generate the process subfunctions
+ for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ generate_process_async_function(tservice, *f_iter);
+ }
+
+ indent_down();
+ indent(f_service_) << "}" << endl << endl;
+}
+
+
+/**
* Generates a struct and helpers for a function.
*
* @param tfunction The function
@@ -2666,6 +2728,159 @@
generate_java_struct_definition(f_service_, &result, false, true, true);
}
+
+
+/**
+ * Generates a process function definition.
+ *
+ * @param tfunction The function to write a dispatcher for
+ */
+void t_java_generator::generate_process_async_function(t_service* tservice,
+ t_function* tfunction) {
+ string argsname = tfunction->get_name() + "_args";
+
+ string resultname = tfunction->get_name() + "_result";
+ if (tfunction->is_oneway()) {
+ resultname = "org.apache.thrift.TBase";
+ }
+
+ string resulttype = type_name(tfunction->get_returntype(),true);
+
+
+ (void) tservice;
+ // Open class
+ indent(f_service_) <<
+ "public static class " << tfunction->get_name() << "<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, " << argsname << ", "<<resulttype<<"> {" << endl;
+ indent_up();
+
+ indent(f_service_) << "public " << tfunction->get_name() << "() {" << endl;
+ indent(f_service_) << " super(\"" << tfunction->get_name() << "\");" << endl;
+ indent(f_service_) << "}" << endl << endl;
+
+ indent(f_service_) << "public " << argsname << " getEmptyArgsInstance() {" << endl;
+ indent(f_service_) << " return new " << argsname << "();" << endl;
+ indent(f_service_) << "}" << endl << endl;
+
+ indent(f_service_) << "public AsyncMethodCallback<"<<resulttype<<"> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {" << endl;
+ indent_up();
+ indent(f_service_) << "final org.apache.thrift.AsyncProcessFunction fcall = this;"<<endl;
+ indent(f_service_) << "return new AsyncMethodCallback<"<<resulttype<<">() { " << endl;
+ indent_up();
+ indent(f_service_) << "public void onComplete(" << resulttype <<" o) {" << endl;
+
+ indent_up();
+ if (!tfunction->is_oneway()) {
+ indent(f_service_) <<resultname<<" result = new "<<resultname<<"();"<<endl;
+
+ if (!tfunction->get_returntype()->is_void()) {
+ indent(f_service_) << "result.success = o;"<<endl;
+ // Set isset on success field
+ if (!type_can_be_null(tfunction->get_returntype())) {
+ indent(f_service_) << "result.set" << get_cap_name("success") << get_cap_name("isSet") << "(true);" << endl;
+ }
+ }
+
+ indent(f_service_) << "try {"<<endl;
+ indent(f_service_) << " fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);"<<endl;
+ indent(f_service_) << " return;"<<endl;
+ indent(f_service_) << "} catch (Exception e) {"<<endl;
+ indent(f_service_) << " LOGGER.error(\"Exception writing to internal frame buffer\", e);"<<endl;
+ indent(f_service_) << "}"<<endl;
+ indent(f_service_) << "fb.close();"<<endl;
+ }
+ indent_down();
+ indent(f_service_) << "}" <<endl;
+
+ indent(f_service_) << "public void onError(Exception e) {"<<endl;
+ indent_up();
+
+
+
+ if (!tfunction->is_oneway()) {
+ indent(f_service_) <<"byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;"<<endl;
+ indent(f_service_) <<"org.apache.thrift.TBase msg;"<<endl;
+ indent(f_service_) <<resultname<<" result = new "<<resultname<<"();"<<endl;
+
+ t_struct* xs = tfunction->get_xceptions();
+ const std::vector<t_field*>& xceptions = xs->get_members();
+ vector<t_field*>::const_iterator x_iter;
+ bool first = true;
+ if (xceptions.size() > 0) {
+ for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+ first ? first = false : indent(f_service_) << "else ";
+ indent(f_service_) << "if (e instanceof " << type_name((*x_iter)->get_type(), false, false)<<") {" << endl;
+ indent(f_service_) << indent() << "result." << (*x_iter)->get_name() << " = (" << type_name((*x_iter)->get_type(), false, false) << ") e;" << endl;
+ indent(f_service_) << indent() << "result.set" << get_cap_name((*x_iter)->get_name()) << get_cap_name("isSet") << "(true);" << endl;
+ indent(f_service_) << indent() << "msg = result;"<<endl;
+
+ indent(f_service_) << "}"<<endl;
+ }
+ indent(f_service_) << " else "<<endl;
+ }
+
+ indent(f_service_) << "{"<<endl;
+ indent_up();
+ indent(f_service_) << "msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;"<<endl;
+ indent(f_service_) << "msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());"<<endl;
+ indent_down();
+ indent(f_service_) << "}"<<endl;
+
+
+ indent(f_service_) << "try {"<<endl;
+ indent(f_service_) << " fcall.sendResponse(fb,msg,msgType,seqid);"<<endl;
+ indent(f_service_) << " return;"<<endl;
+ indent(f_service_) << "} catch (Exception ex) {"<<endl;
+ indent(f_service_) << " LOGGER.error(\"Exception writing to internal frame buffer\", ex);"<<endl;
+ indent(f_service_) << "}"<<endl;
+ indent(f_service_) << "fb.close();"<<endl;
+ }
+ indent_down();
+ indent(f_service_) << "}" <<endl;
+ indent_down();
+ indent(f_service_) << "};" <<endl;
+ indent_down();
+ indent(f_service_) << "}" << endl << endl;
+
+ indent(f_service_) << "protected boolean isOneway() {" << endl;
+ indent(f_service_) << " return " << ((tfunction->is_oneway())?"true":"false") << ";" << endl;
+ indent(f_service_) << "}" << endl << endl;
+
+ indent(f_service_) << "public void start(I iface, " << argsname << " args, org.apache.thrift.async.AsyncMethodCallback<"<<resulttype<<"> resultHandler) throws TException {" << endl;
+ indent_up();
+
+ // Generate the function call
+ t_struct* arg_struct = tfunction->get_arglist();
+ const std::vector<t_field*>& fields = arg_struct->get_members();
+ vector<t_field*>::const_iterator f_iter;
+ f_service_ << indent();
+
+ f_service_ << "iface." << tfunction->get_name() << "(";
+ bool first = true;
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if (first) {
+ first = false;
+ } else {
+ f_service_ << ", ";
+ }
+ f_service_ << "args." << (*f_iter)->get_name();
+ }
+ if (!first)
+ f_service_ << ",";
+ f_service_ << "resultHandler";
+ f_service_ << ");" << endl;
+
+
+ indent_down();
+ indent(f_service_) << "}";
+
+ // Close function
+ f_service_ << endl;
+
+ // Close class
+ indent_down();
+ f_service_ << indent() << "}" << endl << endl;
+}
+
/**
* Generates a process function definition.
*
@@ -2773,6 +2988,7 @@
f_service_ << indent() << "}" << endl << endl;
}
+
/**
* Deserializes a field of any type.
*
@@ -3270,7 +3486,7 @@
switch (tbase) {
case t_base_type::TYPE_VOID:
- return "void";
+ return (in_container ? "Void" : "void");
case t_base_type::TYPE_STRING:
if (type->is_binary()) {
return "ByteBuffer";
@@ -3395,14 +3611,8 @@
arglist = argument_list(tfunc->get_arglist(), include_types) + ", ";
}
- std::string ret_type = "";
- if (use_base_method) {
- ret_type += "AsyncClient.";
- }
- ret_type += tfunc->get_name() + "_call";
-
if (include_types) {
- arglist += "org.apache.thrift.async.AsyncMethodCallback<" + ret_type + "> ";
+ arglist += "org.apache.thrift.async.AsyncMethodCallback ";
}
arglist += "resultHandler";
@@ -3453,7 +3663,7 @@
result += ", ";
}
if (include_types) {
- result += "org.apache.thrift.async.AsyncMethodCallback<" + tfunct->get_name() + "_call" + "> ";
+ result += "org.apache.thrift.async.AsyncMethodCallback ";
}
result += "resultHandler";
return result;
diff --git a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
new file mode 100644
index 0000000..799e02d
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TMessageType;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.AbstractNonblockingServer;
+
+public abstract class AsyncProcessFunction<I, T, R> {
+ final String methodName;
+
+ public AsyncProcessFunction(String methodName) {
+ this.methodName = methodName;
+ }
+
+ protected abstract boolean isOneway();
+
+ public abstract void start(I iface, T args, AsyncMethodCallback<R> resultHandler) throws TException;
+
+ public abstract T getEmptyArgsInstance();
+
+ public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid);
+
+ public String getMethodName() {
+ return methodName;
+ }
+
+ public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TBase result, final byte type, final int seqid) throws TException {
+ TProtocol oprot = fb.getOutputProtocol();
+
+ oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid));
+ result.write(oprot);
+ oprot.writeMessageEnd();
+ oprot.getTransport().flush();
+
+ fb.responseReady();
+ }
+}
diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
new file mode 100644
index 0000000..da41620
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift;
+
+import org.apache.thrift.protocol.*;
+
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class TBaseAsyncProcessor<I> implements TProcessor {
+ protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
+ final I iface;
+ final Map<String,AsyncProcessFunction<I, ? extends TBase,?>> processMap;
+
+ public TBaseAsyncProcessor(I iface, Map<String, AsyncProcessFunction<I, ? extends TBase,?>> processMap) {
+ this.iface = iface;
+ this.processMap = processMap;
+ }
+
+ public Map<String,AsyncProcessFunction<I, ? extends TBase,?>> getProcessMapView() {
+ return Collections.unmodifiableMap(processMap);
+ }
+
+ public boolean process(final AsyncFrameBuffer fb) throws TException {
+
+ final TProtocol in = fb.getInputProtocol();
+ final TProtocol out = fb.getOutputProtocol();
+
+ //Find processing function
+ final TMessage msg = in.readMessageBegin();
+ AsyncProcessFunction fn = processMap.get(msg.name);
+ if (fn == null) {
+ TProtocolUtil.skip(in, TType.STRUCT);
+ in.readMessageEnd();
+ TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
+ out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
+ x.write(out);
+ out.writeMessageEnd();
+ out.getTransport().flush();
+ fb.responseReady();
+ return true;
+ }
+
+ //Get Args
+ TBase args = (TBase)fn.getEmptyArgsInstance();
+
+ try {
+ args.read(in);
+ } catch (TProtocolException e) {
+ in.readMessageEnd();
+ TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
+ out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
+ x.write(out);
+ out.writeMessageEnd();
+ out.getTransport().flush();
+ fb.responseReady();
+ return true;
+ }
+ in.readMessageEnd();
+
+
+ //start off processing function
+ fn.start(iface, args,fn.getResultHandler(fb,msg.seqid));
+ return true;
+ }
+
+ @Override
+ public boolean process(TProtocol in, TProtocol out) throws TException {
+ return false;
+ }
+}
diff --git a/lib/java/src/org/apache/thrift/TProcessorFactory.java b/lib/java/src/org/apache/thrift/TProcessorFactory.java
index bcd8a38..f6dfb14 100644
--- a/lib/java/src/org/apache/thrift/TProcessorFactory.java
+++ b/lib/java/src/org/apache/thrift/TProcessorFactory.java
@@ -36,4 +36,8 @@
public TProcessor getProcessor(TTransport trans) {
return processor_;
}
+
+ public boolean isAsyncProcessor() {
+ return processor_ instanceof TBaseAsyncProcessor;
+ }
}
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 97afc0b..80da6ca 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
@@ -62,12 +63,12 @@
* time. Without this limit, the server will gladly allocate client buffers
* right into an out of memory exception, rather than waiting.
*/
- private final long MAX_READ_BUFFER_BYTES;
+ final long MAX_READ_BUFFER_BYTES;
/**
* How many bytes are currently allocated to read buffers.
*/
- private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+ final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
@@ -265,40 +266,42 @@
* response data back to the client. In the process it manages flipping the
* read and write bits on the selection key for its client.
*/
- protected class FrameBuffer {
+ public class FrameBuffer {
+ private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
// the actual transport hooked up to the client.
- public final TNonblockingTransport trans_;
+ protected final TNonblockingTransport trans_;
// the SelectionKey that corresponds to our transport
- private final SelectionKey selectionKey_;
+ protected final SelectionKey selectionKey_;
// the SelectThread that owns the registration of our transport
- private final AbstractSelectThread selectThread_;
+ protected final AbstractSelectThread selectThread_;
// where in the process of reading/writing are we?
- private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
+ protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
// the ByteBuffer we'll be using to write and read, depending on the state
- private ByteBuffer buffer_;
+ protected ByteBuffer buffer_;
- private final TByteArrayOutputStream response_;
+ protected final TByteArrayOutputStream response_;
// the frame that the TTransport should wrap.
- private final TMemoryInputTransport frameTrans_;
+ protected final TMemoryInputTransport frameTrans_;
// the transport that should be used to connect to clients
- private final TTransport inTrans_;
+ protected final TTransport inTrans_;
- private final TTransport outTrans_;
+ protected final TTransport outTrans_;
// the input protocol to use on frames
- private final TProtocol inProt_;
+ protected final TProtocol inProt_;
// the output protocol to use on frames
- private final TProtocol outProt_;
+ protected final TProtocol outProt_;
// context associated with this connection
- private final ServerContext context_;
+ protected final ServerContext context_;
public FrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
@@ -561,7 +564,7 @@
* current thread is this FrameBuffer's select thread, then it just does the
* interest change immediately.
*/
- private void requestSelectInterestChange() {
+ protected void requestSelectInterestChange() {
if (Thread.currentThread() == this.selectThread_) {
changeSelectInterests();
} else {
@@ -569,4 +572,39 @@
}
}
} // FrameBuffer
+
+ public class AsyncFrameBuffer extends FrameBuffer {
+ public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ }
+
+ public TProtocol getInputProtocol() {
+ return inProt_;
+ }
+
+ public TProtocol getOutputProtocol() {
+ return outProt_;
+ }
+
+
+ public void invoke() {
+ frameTrans_.reset(buffer_.array());
+ response_.reset();
+
+ try {
+ if (eventHandler_ != null) {
+ eventHandler_.processContext(context_, inTrans_, outTrans_);
+ }
+ ((TBaseAsyncProcessor)processorFactory_.getProcessor(inTrans_)).process(this);
+ return;
+ } catch (TException te) {
+ LOGGER.warn("Exception while invoking!", te);
+ } catch (Throwable t) {
+ LOGGER.error("Unexpected throwable while invoking!", t);
+ }
+ // This will only be reached when there is a throwable.
+ state_ = FrameBufferState.AWAITING_CLOSE;
+ requestSelectInterestChange();
+ }
+ }
}
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 240b123..a6e7476 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -224,9 +224,11 @@
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
// add this key to the map
- FrameBuffer frameBuffer = new FrameBuffer(client, clientKey,
- SelectAcceptThread.this);
- clientKey.attach(frameBuffer);
+ FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+ new AsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+ new FrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+ clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
// something went wrong accepting.
LOGGER.warn("Exception trying to accept!", tte);
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 29eabb1..8a68632 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -606,7 +606,10 @@
try {
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
- FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+ FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+ new AsyncFrameBuffer(accepted, clientKey, SelectorThread.this) :
+ new FrameBuffer(accepted, clientKey, SelectorThread.this);
+
clientKey.attach(frameBuffer);
} catch (IOException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
index 209038b..4cbb511 100755
--- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java
+++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java
@@ -30,6 +30,7 @@
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -374,11 +375,14 @@
System.out.print("}\n");
}
+ public boolean useAsyncProcessor() {
+ return false;
+ }
+
public void testIt() throws Exception {
for (TProtocolFactory protoFactory : getProtocols()) {
- TestHandler handler = new TestHandler();
- ThriftTest.Processor processor = new ThriftTest.Processor(handler);
+ TProcessor processor = useAsyncProcessor() ? new ThriftTest.AsyncProcessor(new AsyncTestHandler()) : new ThriftTest.Processor(new TestHandler());
startServer(processor, protoFactory);
@@ -557,4 +561,115 @@
}*/
}
+
+ public static class AsyncTestHandler implements ThriftTest.AsyncIface {
+
+ TestHandler handler = new TestHandler();
+
+ @Override
+ public void testVoid(AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(null);
+ }
+
+ @Override
+ public void testString(String thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testString(thing));
+ }
+
+ @Override
+ public void testByte(byte thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testByte(thing));
+ }
+
+ @Override
+ public void testI32(int thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testI32(thing));
+ }
+
+ @Override
+ public void testI64(long thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testI64(thing));
+ }
+
+ @Override
+ public void testDouble(double thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testDouble(thing));
+ }
+
+ @Override
+ public void testStruct(Xtruct thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testStruct(thing));
+ }
+
+ @Override
+ public void testNest(Xtruct2 thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testNest(thing));
+ }
+
+ @Override
+ public void testMap(Map<Integer, Integer> thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testMap(thing));
+ }
+
+ @Override
+ public void testStringMap(Map<String, String> thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testStringMap(thing));
+ }
+
+ @Override
+ public void testSet(Set<Integer> thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testSet(thing));
+ }
+
+ @Override
+ public void testList(List<Integer> thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testList(thing));
+ }
+
+ @Override
+ public void testEnum(Numberz thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testEnum(thing));
+ }
+
+ @Override
+ public void testTypedef(long thing, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testTypedef(thing));
+ }
+
+ @Override
+ public void testMapMap(int hello, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testMapMap(hello));
+ }
+
+ @Override
+ public void testInsanity(Insanity argument, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testInsanity(argument));
+ }
+
+ @Override
+ public void testMulti(byte arg0, int arg1, long arg2, Map<Short, String> arg3, Numberz arg4, long arg5, AsyncMethodCallback resultHandler) throws TException {
+ resultHandler.onComplete(handler.testMulti(arg0,arg1,arg2,arg3,arg4,arg5));
+ }
+
+ @Override
+ public void testException(String arg, AsyncMethodCallback resultHandler) throws TException {
+ try {
+ // handler.testException();
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Override
+ public void testMultiException(String arg0, String arg1, AsyncMethodCallback resultHandler) throws TException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void testOneway(int secondsToSleep, AsyncMethodCallback resultHandler) throws TException {
+ handler.testOneway(secondsToSleep);
+ resultHandler.onComplete(null);
+ }
+ }
+
}
diff --git a/lib/java/test/org/apache/thrift/server/TestAsyncServer.java b/lib/java/test/org/apache/thrift/server/TestAsyncServer.java
new file mode 100644
index 0000000..29c54cb
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/server/TestAsyncServer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.server;
+
+public class TestAsyncServer extends TestNonblockingServer {
+
+ @Override
+ public boolean useAsyncProcessor(){
+ return true;
+ }
+
+}