THRIFT-1195 Allow users to act on client connects/disconnects
HIVE-3067 Shutdown HiveMetaStore on client disconnect or timeout
HIVE-3057 metastore.HiveMetaStore$HMSHandler should set the thread local raw store to null in shutdown()

Patch: Dragan Okiljevic

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1346566 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 7fd75bf..e5e26b2 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -550,4 +550,13 @@
       }
     }
   } // FrameBuffer
+
+  public void setServerEventHandler(TServerEventHandler eventHandler) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  public TServerEventHandler getEventHandler() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
 }
diff --git a/lib/java/src/org/apache/thrift/server/ServerContext.java b/lib/java/src/org/apache/thrift/server/ServerContext.java
new file mode 100644
index 0000000..9b0b99e
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/ServerContext.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interface for storing server's connection context
+ */
+ 
+package org.apache.thrift.server;
+
+public interface ServerContext {}
diff --git a/lib/java/src/org/apache/thrift/server/TServer.java b/lib/java/src/org/apache/thrift/server/TServer.java
index 0af66d3..a85a429 100644
--- a/lib/java/src/org/apache/thrift/server/TServer.java
+++ b/lib/java/src/org/apache/thrift/server/TServer.java
@@ -125,6 +125,8 @@
 
   private boolean isServing;
 
+  protected TServerEventHandler eventHandler_;
+
   protected TServer(AbstractServerArgs args) {
     processorFactory_ = args.processorFactory;
     serverTransport_ = args.serverTransport;
@@ -152,4 +154,12 @@
   protected void setServing(boolean serving) {
     isServing = serving;
   }
+
+  public void setServerEventHandler(TServerEventHandler eventHandler) {
+    eventHandler_ = eventHandler;
+  }
+
+  public TServerEventHandler getEventHandler() {
+    return eventHandler_;
+  }
 }
diff --git a/lib/java/src/org/apache/thrift/server/TServerEventHandler.java b/lib/java/src/org/apache/thrift/server/TServerEventHandler.java
new file mode 100644
index 0000000..f069b9b
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/TServerEventHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * Interface that can handle events from the server core. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+public interface TServerEventHandler {
+
+  /**
+   * Called before the server begins.
+   */
+  void preServe();
+
+  /**
+   * Called when a new client has connected and is about to being processing.
+   */
+  ServerContext createContext(TProtocol input,
+                              TProtocol output);
+
+  /**
+   * Called when a client has finished request-handling to delete server
+   * context.
+   */
+  void deleteContext(ServerContext serverContext,
+                             TProtocol input,
+                             TProtocol output);
+
+  /**
+   * Called when a client is about to call the processor.
+   */
+  void processContext(ServerContext serverContext,
+                              TTransport inputTransport, TTransport outputTransport);
+
+}
\ No newline at end of file
diff --git a/lib/java/src/org/apache/thrift/server/TSimpleServer.java b/lib/java/src/org/apache/thrift/server/TSimpleServer.java
index ef1b10a..6e92801 100644
--- a/lib/java/src/org/apache/thrift/server/TSimpleServer.java
+++ b/lib/java/src/org/apache/thrift/server/TSimpleServer.java
@@ -50,6 +50,11 @@
       return;
     }
 
+    // Run the preServe event
+    if (eventHandler_ != null) {
+      eventHandler_.preServe();
+    }
+
     setServing(true);
 
     while (!stopped_) {
@@ -59,6 +64,7 @@
       TTransport outputTransport = null;
       TProtocol inputProtocol = null;
       TProtocol outputProtocol = null;
+      ServerContext connectionContext = null;
       try {
         client = serverTransport_.accept();
         if (client != null) {
@@ -67,7 +73,17 @@
           outputTransport = outputTransportFactory_.getTransport(client);
           inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
           outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
-          while (processor.process(inputProtocol, outputProtocol)) {}
+          if (eventHandler_ != null) {
+            connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
+          }
+          while (true) {
+            if (eventHandler_ != null) {
+              eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
+            }
+            if(!processor.process(inputProtocol, outputProtocol)) {
+              break;
+            }
+          }
         }
       } catch (TTransportException ttx) {
         // Client died, just move on
@@ -81,6 +97,10 @@
         }
       }
 
+      if (eventHandler_ != null) {
+        eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
+      }
+
       if (inputTransport != null) {
         inputTransport.close();
       }
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
index 0b037d8..9a68c76 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
@@ -108,6 +108,11 @@
       return;
     }
 
+    // Run the preServe event
+    if (eventHandler_ != null) {
+      eventHandler_.preServe();
+    }
+
     stopped_ = false;
     setServing(true);
     while (!stopped_) {
@@ -175,15 +180,33 @@
       TTransport outputTransport = null;
       TProtocol inputProtocol = null;
       TProtocol outputProtocol = null;
+
+      TServerEventHandler eventHandler = null;
+      ServerContext connectionContext = null;
+
       try {
         processor = processorFactory_.getProcessor(client_);
         inputTransport = inputTransportFactory_.getTransport(client_);
         outputTransport = outputTransportFactory_.getTransport(client_);
         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
-        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);	  
+
+        eventHandler = getEventHandler();
+        if (eventHandler != null) {
+          connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
+        }
         // we check stopped_ first to make sure we're not supposed to be shutting
         // down. this is necessary for graceful shutdown.
-        while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {}
+        while (true) {
+
+            if (eventHandler != null) {
+              eventHandler.processContext(connectionContext, inputTransport, outputTransport);
+            }
+
+            if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {
+              break;
+            }
+        }
       } catch (TTransportException ttx) {
         // Assume the client died and continue silently
       } catch (TException tx) {
@@ -192,6 +215,10 @@
         LOGGER.error("Error occurred during processing of message.", x);
       }
 
+      if (eventHandler != null) {
+        eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+      }
+
       if (inputTransport != null) {
         inputTransport.close();
       }
diff --git a/lib/java/test/org/apache/thrift/test/TestServer.java b/lib/java/test/org/apache/thrift/test/TestServer.java
index e65756d..9077882 100644
--- a/lib/java/test/org/apache/thrift/test/TestServer.java
+++ b/lib/java/test/org/apache/thrift/test/TestServer.java
@@ -26,13 +26,17 @@
 import java.util.Set;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TServer.Args;
 import org.apache.thrift.server.TSimpleServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.server.ServerTestBase.TestHandler;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.transport.TTransport;
 
 import thrift.test.Insanity;
 import thrift.test.Numberz;
@@ -44,6 +48,51 @@
 
 public class TestServer {
 
+  static class TestServerContext implements ServerContext {
+
+        int connectionId;
+
+        public TestServerContext(int connectionId) {
+            this.connectionId = connectionId;
+        }
+
+        public int getConnectionId() {
+            return connectionId;
+        }
+
+        public void setConnectionId(int connectionId) {
+            this.connectionId = connectionId;
+        }
+
+  }
+
+  static class TestServerEventHandler implements TServerEventHandler {
+
+        private int nextConnectionId = 1;
+
+        public void preServe() {
+            System.out.println("TServerEventHandler.preServe - called only once before server starts accepting connections");
+        }
+
+        public ServerContext createContext(TProtocol input, TProtocol output) {
+            //we can create some connection level data which is stored while connection is alive & served
+            TestServerContext ctx = new TestServerContext(nextConnectionId++);
+            System.out.println("TServerEventHandler.createContext - connection #"+ctx.getConnectionId()+" established");
+            return ctx;
+        }
+
+        public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
+            TestServerContext ctx = (TestServerContext)serverContext;
+            System.out.println("TServerEventHandler.deleteContext - connection #"+ctx.getConnectionId()+" terminated");
+        }
+
+        public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
+            TestServerContext ctx = (TestServerContext)serverContext;
+            System.out.println("TServerEventHandler.processContext - connection #"+ctx.getConnectionId()+" is ready to process next request");
+        }
+
+  }
+
   public static void main(String [] args) {
     try {
       int port = 9090;
@@ -74,6 +123,9 @@
       // ThreadPool Server
       serverEngine = new TThreadPoolServer(new TThreadPoolServer.Args(tServerSocket).processor(testProcessor).protocolFactory(tProtocolFactory));
 
+      //Set server event handler
+      serverEngine.setServerEventHandler(new TestServerEventHandler());
+
       // Run it
       System.out.println("Starting the server on port " + port + "...");
       serverEngine.serve();