THRIFT-5240: Do connectivity check in Go server

Client: go

In compiler generated TProcessorFunction implementations, add a
goroutine after read the request to do connectivity check on the input
transport. If the transport is no longer open, cancel the context object
passed into the handler implementation.

Also define ErrAbandonRequest error, to help TSimpleServer closing
client connections that's already closed on the other end.
diff --git a/CHANGES.md b/CHANGES.md
index b6c2021..be0286a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@
 - [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ClientMiddleware function type and WrapClient function to support wrapping a TClient with middleware functions.
 - [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ProcessorMiddleware function type and WrapProcessor function to support wrapping a TProcessor with middleware functions.
 - [THRIFT-5233](https://issues.apache.org/jira/browse/THRIFT-5233) - Add context deadline check to ReadMessageBegin in TBinaryProtocol, TCompactProtocol, and THeaderProtocol.
+- [THRIFT-5240](https://issues.apache.org/jira/browse/THRIFT-5240) - The context passed into server handler implementations will be canceled when we detected that the client closed the connection.
 
 ## 0.13.0
 
diff --git a/compiler/cpp/src/thrift/generate/t_go_generator.cc b/compiler/cpp/src/thrift/generate/t_go_generator.cc
index 6f73819..3bb2a5c 100644
--- a/compiler/cpp/src/thrift/generate/t_go_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_go_generator.cc
@@ -919,6 +919,7 @@
     system_packages.push_back("errors");
   }
   system_packages.push_back("fmt");
+  system_packages.push_back("time");
   system_packages.push_back(gen_thrift_import_);
   return "import(\n" + render_system_packages(system_packages);
 }
@@ -937,6 +938,7 @@
       "var _ = fmt.Printf\n"
       "var _ = context.Background\n"
       "var _ = reflect.DeepEqual\n"
+      "var _ = time.Now\n"
       "var _ = bytes.Equal\n\n");
 }
 
@@ -2778,15 +2780,66 @@
     f_types_ << indent() << "  oprot.Flush(ctx)" << endl;
   }
   f_types_ << indent() << "  return false, err" << endl;
-  f_types_ << indent() << "}" << endl << endl;
-  f_types_ << indent() << "iprot.ReadMessageEnd(ctx)" << endl;
+  f_types_ << indent() << "}" << endl;
+  f_types_ << indent() << "iprot.ReadMessageEnd(ctx)" << endl << endl;
+
+  // Even though we never create the goroutine in oneway handlers,
+  // always have (nop) tickerCancel defined makes the writing part of code
+  // generating easier and less error-prone.
+  f_types_ << indent() << "tickerCancel := func() {}" << endl;
+  // Only create the goroutine for non-oneways.
+  if (!tfunction->is_oneway()) {
+    f_types_ << indent() << "// Start a goroutine to do server side connectivity check." << endl;
+    f_types_ << indent() << "if thrift.ServerConnectivityCheckInterval > 0 {" << endl;
+
+    indent_up();
+    f_types_ << indent() << "var cancel context.CancelFunc" << endl;
+    f_types_ << indent() << "ctx, cancel = context.WithCancel(ctx)" << endl;
+    f_types_ << indent() << "defer cancel()" << endl;
+    f_types_ << indent() << "var tickerCtx context.Context" << endl;
+    f_types_ << indent() << "tickerCtx, tickerCancel = context.WithCancel(context.Background())" << endl;
+    f_types_ << indent() << "defer tickerCancel()" << endl;
+    f_types_ << indent() << "go func(ctx context.Context, cancel context.CancelFunc) {" << endl;
+
+    indent_up();
+    f_types_ << indent() << "ticker := time.NewTicker(thrift.ServerConnectivityCheckInterval)" << endl;
+    f_types_ << indent() << "defer ticker.Stop()" << endl;
+    f_types_ << indent() << "for {" << endl;
+
+    indent_up();
+    f_types_ << indent() << "select {" << endl;
+    f_types_ << indent() << "case <-ctx.Done():" << endl;
+    indent_up();
+    f_types_ << indent() << "return" << endl;
+    indent_down();
+    f_types_ << indent() << "case <-ticker.C:" << endl;
+
+    indent_up();
+    f_types_ << indent() << "if !iprot.Transport().IsOpen() {" << endl;
+    indent_up();
+    f_types_ << indent() << "cancel()" << endl;
+    f_types_ << indent() << "return" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl;
+    indent_down();
+    f_types_ << indent() << "}(tickerCtx, cancel)" << endl;
+    indent_down();
+    f_types_ << indent() << "}" << endl << endl;
+  } else {
+    // Make sure we don't get the defined but unused compiling error.
+    f_types_ << indent() << "_ = tickerCancel" << endl << endl;
+  }
 
   if (!tfunction->is_oneway()) {
     f_types_ << indent() << "result := " << resultname << "{}" << endl;
   }
   bool need_reference = type_need_reference(tfunction->get_returntype());
   if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
-    f_types_ << "var retval " << type_to_go_type(tfunction->get_returntype()) << endl;
+    f_types_ << indent() << "var retval " << type_to_go_type(tfunction->get_returntype()) << endl;
   }
 
   f_types_ << indent() << "var err2 error" << endl;
@@ -2818,6 +2871,7 @@
   }
 
   f_types_ << "); err2 != nil {" << endl;
+  f_types_ << indent() << "  tickerCancel()" << endl;
 
   t_struct* exceptions = tfunction->get_xceptions();
   const vector<t_field*>& x_fields = exceptions->get_members();
@@ -2836,6 +2890,11 @@
   }
 
   if (!tfunction->is_oneway()) {
+    // Avoid writing the error to the wire if it's ErrAbandonRequest
+    f_types_ << indent() << "  if err2 == thrift.ErrAbandonRequest {" << endl;
+    f_types_ << indent() << "    return false, err2" << endl;
+    f_types_ << indent() << "  }" << endl;
+
     f_types_ << indent() << "  x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "
                               "\"Internal error processing " << escape_string(tfunction->get_name())
                << ": \" + err2.Error())" << endl;
@@ -2864,10 +2923,11 @@
       }
       f_types_ << "retval" << endl;
       indent_down();
-      f_types_ << "}" << endl;
+      f_types_ << indent() << "}" << endl;
     } else {
       f_types_ << endl;
     }
+    f_types_ << indent() << "tickerCancel()" << endl;
     f_types_ << indent() << "if err2 = oprot.WriteMessageBegin(ctx, \""
                << escape_string(tfunction->get_name()) << "\", thrift.REPLY, seqId); err2 != nil {"
                << endl;
@@ -2889,6 +2949,7 @@
     f_types_ << indent() << "return true, err" << endl;
   } else {
     f_types_ << endl;
+    f_types_ << indent() << "tickerCancel()" << endl;
     f_types_ << indent() << "return true, nil" << endl;
   }
   indent_down();
diff --git a/lib/go/README.md b/lib/go/README.md
index ce6d5ed..5b7e2cd 100644
--- a/lib/go/README.md
+++ b/lib/go/README.md
@@ -81,3 +81,30 @@
     type Foo struct {
       Bar string `thrift:"bar,1,required" some_tag:"some_tag_value"`
     }
+
+A note about server handler implementations
+===========================================
+
+The context object passed into the server handler function will be canceled when
+the client closes the connection (this is a best effort check, not a guarantee
+-- there's no guarantee that the context object is always canceled when client
+closes the connection, but when it's canceled you can always assume the client
+closed the connection). When implementing Go Thrift server, you can take
+advantage of that to abandon requests that's no longer needed:
+
+    func MyEndpoint(ctx context.Context, req *thriftRequestType) (*thriftResponseType, error) {
+        ...
+        if ctx.Err() == context.Canceled {
+            return nil, thrift.ErrAbandonRequest
+        }
+        ...
+    }
+
+This feature would add roughly 1 millisecond of latency overhead to the server
+handlers (along with roughly 2 goroutines per request).
+If that is unacceptable, it can be disabled by having this line early in your
+main function:
+
+    thrift.ServerConnectivityCheckInterval = 0
+
+This feature is also only enabled on non-oneway endpoints.
diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go
index 85baa4e..68ac394 100644
--- a/lib/go/thrift/simple_server.go
+++ b/lib/go/thrift/simple_server.go
@@ -20,12 +20,34 @@
 package thrift
 
 import (
+	"errors"
 	"fmt"
 	"io"
 	"sync"
 	"sync/atomic"
+	"time"
 )
 
+// ErrAbandonRequest is a special error server handler implementations can
+// return to indicate that the request has been abandoned.
+//
+// TSimpleServer will check for this error, and close the client connection
+// instead of writing the response/error back to the client.
+//
+// It shall only be used when the server handler implementation know that the
+// client already abandoned the request (by checking that the passed in context
+// is already canceled, for example).
+var ErrAbandonRequest = errors.New("request abandoned")
+
+// ServerConnectivityCheckInterval defines the ticker interval used by
+// connectivity check in thrift compiled TProcessorFunc implementations.
+//
+// It's defined as a variable instead of constant, so that thrift server
+// implementations can change its value to control the behavior.
+//
+// If it's changed to <=0, the feature will be disabled.
+var ServerConnectivityCheckInterval = time.Millisecond
+
 /*
  * This is not a typical TSimpleServer as it is not blocked after accept a socket.
  * It is more like a TThreadedServer that can handle different connections in different goroutines.
@@ -293,6 +315,9 @@
 		}
 
 		ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
+		if err == ErrAbandonRequest {
+			return client.Close()
+		}
 		if _, ok := err.(TTransportException); ok && err != nil {
 			return err
 		}