THRIFT-1195 Allow users to act on client connects/disconnects
HIVE-3067 Shutdown HiveMetaStore on client disconnect or timeout
HIVE-3057 metastore.HiveMetaStore$HMSHandler should set the thread local raw store to null in shutdown()
Patch: Dragan Okiljevic
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1346566 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 7fd75bf..e5e26b2 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -550,4 +550,13 @@
}
}
} // FrameBuffer
+
+ public void setServerEventHandler(TServerEventHandler eventHandler) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public TServerEventHandler getEventHandler() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
}
diff --git a/lib/java/src/org/apache/thrift/server/ServerContext.java b/lib/java/src/org/apache/thrift/server/ServerContext.java
new file mode 100644
index 0000000..9b0b99e
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/ServerContext.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interface for storing server's connection context
+ */
+
+package org.apache.thrift.server;
+
+public interface ServerContext {}
diff --git a/lib/java/src/org/apache/thrift/server/TServer.java b/lib/java/src/org/apache/thrift/server/TServer.java
index 0af66d3..a85a429 100644
--- a/lib/java/src/org/apache/thrift/server/TServer.java
+++ b/lib/java/src/org/apache/thrift/server/TServer.java
@@ -125,6 +125,8 @@
private boolean isServing;
+ protected TServerEventHandler eventHandler_;
+
protected TServer(AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
@@ -152,4 +154,12 @@
protected void setServing(boolean serving) {
isServing = serving;
}
+
+ public void setServerEventHandler(TServerEventHandler eventHandler) {
+ eventHandler_ = eventHandler;
+ }
+
+ public TServerEventHandler getEventHandler() {
+ return eventHandler_;
+ }
}
diff --git a/lib/java/src/org/apache/thrift/server/TServerEventHandler.java b/lib/java/src/org/apache/thrift/server/TServerEventHandler.java
new file mode 100644
index 0000000..f069b9b
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/TServerEventHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Interface that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+public interface TServerEventHandler {
+
+ /**
+ * Called before the server begins.
+ */
+ void preServe();
+
+ /**
+ * Called when a new client has connected and is about to being processing.
+ */
+ ServerContext createContext(TProtocol input,
+ TProtocol output);
+
+ /**
+ * Called when a client has finished request-handling to delete server
+ * context.
+ */
+ void deleteContext(ServerContext serverContext,
+ TProtocol input,
+ TProtocol output);
+
+ /**
+ * Called when a client is about to call the processor.
+ */
+ void processContext(ServerContext serverContext,
+ TTransport inputTransport, TTransport outputTransport);
+
+}
\ No newline at end of file
diff --git a/lib/java/src/org/apache/thrift/server/TSimpleServer.java b/lib/java/src/org/apache/thrift/server/TSimpleServer.java
index ef1b10a..6e92801 100644
--- a/lib/java/src/org/apache/thrift/server/TSimpleServer.java
+++ b/lib/java/src/org/apache/thrift/server/TSimpleServer.java
@@ -50,6 +50,11 @@
return;
}
+ // Run the preServe event
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
setServing(true);
while (!stopped_) {
@@ -59,6 +64,7 @@
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
+ ServerContext connectionContext = null;
try {
client = serverTransport_.accept();
if (client != null) {
@@ -67,7 +73,17 @@
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
- while (processor.process(inputProtocol, outputProtocol)) {}
+ if (eventHandler_ != null) {
+ connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
+ }
+ while (true) {
+ if (eventHandler_ != null) {
+ eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
+ }
+ if(!processor.process(inputProtocol, outputProtocol)) {
+ break;
+ }
+ }
}
} catch (TTransportException ttx) {
// Client died, just move on
@@ -81,6 +97,10 @@
}
}
+ if (eventHandler_ != null) {
+ eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
+ }
+
if (inputTransport != null) {
inputTransport.close();
}
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
index 0b037d8..9a68c76 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
@@ -108,6 +108,11 @@
return;
}
+ // Run the preServe event
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
stopped_ = false;
setServing(true);
while (!stopped_) {
@@ -175,15 +180,33 @@
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
+
+ TServerEventHandler eventHandler = null;
+ ServerContext connectionContext = null;
+
try {
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+
+ eventHandler = getEventHandler();
+ if (eventHandler != null) {
+ connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
+ }
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
- while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {}
+ while (true) {
+
+ if (eventHandler != null) {
+ eventHandler.processContext(connectionContext, inputTransport, outputTransport);
+ }
+
+ if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
+ break;
+ }
+ }
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
@@ -192,6 +215,10 @@
LOGGER.error("Error occurred during processing of message.", x);
}
+ if (eventHandler != null) {
+ eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+ }
+
if (inputTransport != null) {
inputTransport.close();
}
diff --git a/lib/java/test/org/apache/thrift/test/TestServer.java b/lib/java/test/org/apache/thrift/test/TestServer.java
index e65756d..9077882 100644
--- a/lib/java/test/org/apache/thrift/test/TestServer.java
+++ b/lib/java/test/org/apache/thrift/test/TestServer.java
@@ -26,13 +26,17 @@
import java.util.Set;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServer.Args;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.server.ServerTestBase.TestHandler;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
import thrift.test.Insanity;
import thrift.test.Numberz;
@@ -44,6 +48,51 @@
public class TestServer {
+ static class TestServerContext implements ServerContext {
+
+ int connectionId;
+
+ public TestServerContext(int connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ public int getConnectionId() {
+ return connectionId;
+ }
+
+ public void setConnectionId(int connectionId) {
+ this.connectionId = connectionId;
+ }
+
+ }
+
+ static class TestServerEventHandler implements TServerEventHandler {
+
+ private int nextConnectionId = 1;
+
+ public void preServe() {
+ System.out.println("TServerEventHandler.preServe - called only once before server starts accepting connections");
+ }
+
+ public ServerContext createContext(TProtocol input, TProtocol output) {
+ //we can create some connection level data which is stored while connection is alive & served
+ TestServerContext ctx = new TestServerContext(nextConnectionId++);
+ System.out.println("TServerEventHandler.createContext - connection #"+ctx.getConnectionId()+" established");
+ return ctx;
+ }
+
+ public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+ TestServerContext ctx = (TestServerContext)serverContext;
+ System.out.println("TServerEventHandler.deleteContext - connection #"+ctx.getConnectionId()+" terminated");
+ }
+
+ public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
+ TestServerContext ctx = (TestServerContext)serverContext;
+ System.out.println("TServerEventHandler.processContext - connection #"+ctx.getConnectionId()+" is ready to process next request");
+ }
+
+ }
+
public static void main(String [] args) {
try {
int port = 9090;
@@ -74,6 +123,9 @@
// ThreadPool Server
serverEngine = new TThreadPoolServer(new TThreadPoolServer.Args(tServerSocket).processor(testProcessor).protocolFactory(tProtocolFactory));
+ //Set server event handler
+ serverEngine.setServerEventHandler(new TestServerEventHandler());
+
// Run it
System.out.println("Starting the server on port " + port + "...");
serverEngine.serve();