THRIFT-1864 java: implement event handler for non-blocking server
Patch: Vitali Lovich
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index e5e26b2..97afc0b 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -281,7 +281,24 @@
// the ByteBuffer we'll be using to write and read, depending on the state
private ByteBuffer buffer_;
- private TByteArrayOutputStream response_;
+ private final TByteArrayOutputStream response_;
+
+ // the frame that the TTransport should wrap.
+ private final TMemoryInputTransport frameTrans_;
+
+ // the transport that should be used to connect to clients
+ private final TTransport inTrans_;
+
+ private final TTransport outTrans_;
+
+ // the input protocol to use on frames
+ private final TProtocol inProt_;
+
+ // the output protocol to use on frames
+ private final TProtocol outProt_;
+
+ // context associated with this connection
+ private final ServerContext context_;
public FrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
@@ -290,6 +307,19 @@
selectionKey_ = selectionKey;
selectThread_ = selectThread;
buffer_ = ByteBuffer.allocate(4);
+
+ frameTrans_ = new TMemoryInputTransport();
+ response_ = new TByteArrayOutputStream();
+ inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
+ outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+ inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
+ outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
+
+ if (eventHandler_ != null) {
+ context_ = eventHandler_.createContext(inProt_, outProt_);
+ } else {
+ context_ = null;
+ }
}
/**
@@ -426,6 +456,9 @@
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
}
trans_.close();
+ if (eventHandler_ != null) {
+ eventHandler_.deleteContext(context_, inProt_, outProt_);
+ }
}
/**
@@ -470,12 +503,14 @@
* Actually invoke the method signified by this FrameBuffer.
*/
public void invoke() {
- TTransport inTrans = getInputTransport();
- TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
- TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
-
+ frameTrans_.reset(buffer_.array());
+ response_.reset();
+
try {
- processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+ if (eventHandler_ != null) {
+ eventHandler_.processContext(context_, inTrans_, outTrans_);
+ }
+ processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
@@ -489,22 +524,6 @@
}
/**
- * Wrap the read buffer in a memory-based transport so a processor can read
- * the data it needs to handle an invocation.
- */
- private TTransport getInputTransport() {
- return inputTransportFactory_.getTransport(new TMemoryInputTransport(buffer_.array()));
- }
-
- /**
- * Get the transport that should be used by the invoker for responding.
- */
- private TTransport getOutputTransport() {
- response_ = new TByteArrayOutputStream();
- return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
- }
-
- /**
* Perform a read into buffer.
*
* @return true if the read succeeded, false if there was an error or the
@@ -550,13 +569,4 @@
}
}
} // FrameBuffer
-
- public void setServerEventHandler(TServerEventHandler eventHandler) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- public TServerEventHandler getEventHandler() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
}
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 169ae5c..240b123 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -150,6 +150,10 @@
*/
public void run() {
try {
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
while (!stopped_) {
select();
processInterestChanges();
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 23ec842..29eabb1 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -371,6 +371,10 @@
*/
public void run() {
try {
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
while (!stopped_) {
select();
}