go: Add a zlib reader pool

We implemented a zlib writer pool for default level when implementing
THeader, this change also add a zlib reader pool to help speed up things
when zlib is used.

Also make TZlibTransport to use the zlib writer pool when it's using the
default compression level.
diff --git a/lib/go/thrift/compact_protocol_test.go b/lib/go/thrift/compact_protocol_test.go
index 65f77f2..0d95759 100644
--- a/lib/go/thrift/compact_protocol_test.go
+++ b/lib/go/thrift/compact_protocol_test.go
@@ -33,9 +33,18 @@
 		NewTFramedTransport(NewTMemoryBuffer()),
 	}
 
-	zlib0, _ := NewTZlibTransport(NewTMemoryBuffer(), 0)
-	zlib6, _ := NewTZlibTransport(NewTMemoryBuffer(), 6)
-	zlib9, _ := NewTZlibTransport(NewTFramedTransport(NewTMemoryBuffer()), 9)
+	newTZlibTransport := func(trans TTransport, level int) *TZlibTransport {
+		t.Helper()
+		zlibTrans, err := NewTZlibTransport(trans, level)
+		if err != nil {
+			t.Fatalf("NewTZlibTransport returned error: %v", err)
+		}
+		return zlibTrans
+	}
+
+	zlib0 := newTZlibTransport(NewTMemoryBuffer(), 0)
+	zlib6 := newTZlibTransport(NewTMemoryBuffer(), 6)
+	zlib9 := newTZlibTransport(NewTFramedTransport(NewTMemoryBuffer()), 9)
 	transports = append(transports, zlib0, zlib6, zlib9)
 
 	for _, trans := range transports {
diff --git a/lib/go/thrift/header_transport.go b/lib/go/thrift/header_transport.go
index d6d6416..06f71b3 100644
--- a/lib/go/thrift/header_transport.go
+++ b/lib/go/thrift/header_transport.go
@@ -166,7 +166,7 @@
 	case TransformNone:
 		// no-op
 	case TransformZlib:
-		readCloser, err := zlib.NewReader(tr.Reader)
+		readCloser, err := newZlibReader(tr.Reader)
 		if err != nil {
 			return err
 		}
@@ -211,25 +211,6 @@
 	return nil
 }
 
-var zlibDefaultLevelWriterPool = newPool(
-	func() *zlib.Writer {
-		return zlib.NewWriter(nil)
-	},
-	nil,
-)
-
-type zlibPoolCloser struct {
-	writer *zlib.Writer
-}
-
-func (z *zlibPoolCloser) Close() error {
-	defer func() {
-		z.writer.Reset(nil)
-		zlibDefaultLevelWriterPool.put(&z.writer)
-	}()
-	return z.writer.Close()
-}
-
 // AddTransform adds a transform.
 func (tw *TransformWriter) AddTransform(id THeaderTransformID) error {
 	switch id {
@@ -241,12 +222,12 @@
 	case TransformNone:
 		// no-op
 	case TransformZlib:
-		writeCloser := zlibDefaultLevelWriterPool.get()
-		writeCloser.Reset(tw.Writer)
-		tw.Writer = writeCloser
-		tw.closers = append(tw.closers, &zlibPoolCloser{
-			writer: writeCloser,
-		})
+		writer, closer, err := newZlibWriterCloserLevel(tw.Writer, zlib.DefaultCompression)
+		if err != nil {
+			return err
+		}
+		tw.Writer = writer
+		tw.closers = append(tw.closers, closer)
 	}
 	return nil
 }
diff --git a/lib/go/thrift/pool.go b/lib/go/thrift/pool.go
index 1d623d4..6912f3e 100644
--- a/lib/go/thrift/pool.go
+++ b/lib/go/thrift/pool.go
@@ -43,7 +43,7 @@
 	}
 	return &pool[T]{
 		pool: sync.Pool{
-			New: func() interface{} {
+			New: func() any {
 				return generate()
 			},
 		},
diff --git a/lib/go/thrift/zlib_pool.go b/lib/go/thrift/zlib_pool.go
new file mode 100644
index 0000000..b419291
--- /dev/null
+++ b/lib/go/thrift/zlib_pool.go
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package thrift
+
+import (
+	"compress/zlib"
+	"io"
+	"sync"
+)
+
+type zlibReader interface {
+	io.ReadCloser
+	zlib.Resetter
+}
+
+var zlibReaderPool sync.Pool
+
+func newZlibReader(r io.Reader) (io.ReadCloser, error) {
+	if reader, _ := zlibReaderPool.Get().(*wrappedZlibReader); reader != nil {
+		if err := reader.Reset(r, nil); err == nil {
+			return reader, nil
+		}
+	}
+	reader, err := zlib.NewReader(r)
+	if err != nil {
+		return nil, err
+	}
+	return &wrappedZlibReader{reader.(zlibReader)}, nil
+}
+
+type wrappedZlibReader struct {
+	zlibReader
+}
+
+func (wr *wrappedZlibReader) Close() error {
+	defer func() {
+		zlibReaderPool.Put(wr)
+	}()
+	return wr.zlibReader.Close()
+}
+
+func newZlibWriterLevelMust(level int) *zlib.Writer {
+	w, err := zlib.NewWriterLevel(nil, level)
+	if err != nil {
+		panic(err)
+	}
+	return w
+}
+
+// level -> pool
+var zlibWriterPools map[int]*pool[zlib.Writer] = func() map[int]*pool[zlib.Writer] {
+	m := make(map[int]*pool[zlib.Writer])
+	for level := zlib.HuffmanOnly; level <= zlib.BestCompression; level++ {
+		// force a panic at init if we have an invalid level here
+		newZlibWriterLevelMust(level)
+		m[level] = newPool(
+			func() *zlib.Writer {
+				return newZlibWriterLevelMust(level)
+			},
+			nil,
+		)
+	}
+	return m
+}()
+
+type zlibWriterPoolCloser struct {
+	writer *zlib.Writer
+	pool   *pool[zlib.Writer]
+}
+
+func (z *zlibWriterPoolCloser) Close() error {
+	defer func() {
+		z.writer.Reset(nil)
+		z.pool.put(&z.writer)
+	}()
+	return z.writer.Close()
+}
+
+func newZlibWriterCloserLevel(w io.Writer, level int) (*zlib.Writer, io.Closer, error) {
+	pool, ok := zlibWriterPools[level]
+	if !ok {
+		// not pooled
+		writer, err := zlib.NewWriterLevel(w, level)
+		if err != nil {
+			return nil, nil, err
+		}
+		return writer, writer, nil
+	}
+	writer := pool.get()
+	writer.Reset(w)
+	return writer, &zlibWriterPoolCloser{writer: writer, pool: pool}, nil
+}
diff --git a/lib/go/thrift/zlib_pool_test.go b/lib/go/thrift/zlib_pool_test.go
new file mode 100644
index 0000000..1b3f5d8
--- /dev/null
+++ b/lib/go/thrift/zlib_pool_test.go
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package thrift
+
+import (
+	"compress/zlib"
+	"fmt"
+	"maps"
+	"slices"
+	"testing"
+)
+
+func TestZlibWriterPools(t *testing.T) {
+	// make sure we have the writer pools created at the given levels
+	for _, level := range []int{
+		zlib.HuffmanOnly,
+		zlib.DefaultCompression,
+		zlib.NoCompression,
+		zlib.BestSpeed,
+		zlib.BestCompression,
+	} {
+		t.Run(fmt.Sprintf("%d", level), func(t *testing.T) {
+			_, ok := zlibWriterPools[level]
+			if !ok {
+				t.Errorf("level %d does not exist in the writer pools", level)
+			}
+		})
+	}
+	if t.Failed() {
+		levels := slices.Collect(maps.Keys(zlibWriterPools))
+		slices.Sort(levels)
+		t.Log("zlib writer pools:", levels)
+	}
+}
diff --git a/lib/go/thrift/zlib_transport.go b/lib/go/thrift/zlib_transport.go
index cefe1f9..8f2d073 100644
--- a/lib/go/thrift/zlib_transport.go
+++ b/lib/go/thrift/zlib_transport.go
@@ -33,9 +33,10 @@
 
 // TZlibTransport is a TTransport implementation that makes use of zlib compression.
 type TZlibTransport struct {
-	reader    io.ReadCloser
-	transport TTransport
-	writer    *zlib.Writer
+	reader      io.ReadCloser
+	transport   TTransport
+	writer      *zlib.Writer
+	writeCloser io.Closer
 }
 
 // GetTransport constructs a new instance of NewTZlibTransport
@@ -64,14 +65,14 @@
 
 // NewTZlibTransport constructs a new instance of TZlibTransport
 func NewTZlibTransport(trans TTransport, level int) (*TZlibTransport, error) {
-	w, err := zlib.NewWriterLevel(trans, level)
+	writer, closer, err := newZlibWriterCloserLevel(trans, level)
 	if err != nil {
 		return nil, err
 	}
-
 	return &TZlibTransport{
-		writer:    w,
-		transport: trans,
+		writer:      writer,
+		writeCloser: closer,
+		transport:   trans,
 	}, nil
 }
 
@@ -83,7 +84,7 @@
 			return err
 		}
 	}
-	if err := z.writer.Close(); err != nil {
+	if err := z.writeCloser.Close(); err != nil {
 		return err
 	}
 	return z.transport.Close()
@@ -109,7 +110,7 @@
 
 func (z *TZlibTransport) Read(p []byte) (int, error) {
 	if z.reader == nil {
-		r, err := zlib.NewReader(z.transport)
+		r, err := newZlibReader(z.transport)
 		if err != nil {
 			return 0, NewTTransportExceptionFromError(err)
 		}