go: Add a zlib reader pool
We implemented a zlib writer pool for default level when implementing
THeader, this change also add a zlib reader pool to help speed up things
when zlib is used.
Also make TZlibTransport to use the zlib writer pool when it's using the
default compression level.
diff --git a/lib/go/thrift/compact_protocol_test.go b/lib/go/thrift/compact_protocol_test.go
index 65f77f2..0d95759 100644
--- a/lib/go/thrift/compact_protocol_test.go
+++ b/lib/go/thrift/compact_protocol_test.go
@@ -33,9 +33,18 @@
NewTFramedTransport(NewTMemoryBuffer()),
}
- zlib0, _ := NewTZlibTransport(NewTMemoryBuffer(), 0)
- zlib6, _ := NewTZlibTransport(NewTMemoryBuffer(), 6)
- zlib9, _ := NewTZlibTransport(NewTFramedTransport(NewTMemoryBuffer()), 9)
+ newTZlibTransport := func(trans TTransport, level int) *TZlibTransport {
+ t.Helper()
+ zlibTrans, err := NewTZlibTransport(trans, level)
+ if err != nil {
+ t.Fatalf("NewTZlibTransport returned error: %v", err)
+ }
+ return zlibTrans
+ }
+
+ zlib0 := newTZlibTransport(NewTMemoryBuffer(), 0)
+ zlib6 := newTZlibTransport(NewTMemoryBuffer(), 6)
+ zlib9 := newTZlibTransport(NewTFramedTransport(NewTMemoryBuffer()), 9)
transports = append(transports, zlib0, zlib6, zlib9)
for _, trans := range transports {
diff --git a/lib/go/thrift/header_transport.go b/lib/go/thrift/header_transport.go
index d6d6416..06f71b3 100644
--- a/lib/go/thrift/header_transport.go
+++ b/lib/go/thrift/header_transport.go
@@ -166,7 +166,7 @@
case TransformNone:
// no-op
case TransformZlib:
- readCloser, err := zlib.NewReader(tr.Reader)
+ readCloser, err := newZlibReader(tr.Reader)
if err != nil {
return err
}
@@ -211,25 +211,6 @@
return nil
}
-var zlibDefaultLevelWriterPool = newPool(
- func() *zlib.Writer {
- return zlib.NewWriter(nil)
- },
- nil,
-)
-
-type zlibPoolCloser struct {
- writer *zlib.Writer
-}
-
-func (z *zlibPoolCloser) Close() error {
- defer func() {
- z.writer.Reset(nil)
- zlibDefaultLevelWriterPool.put(&z.writer)
- }()
- return z.writer.Close()
-}
-
// AddTransform adds a transform.
func (tw *TransformWriter) AddTransform(id THeaderTransformID) error {
switch id {
@@ -241,12 +222,12 @@
case TransformNone:
// no-op
case TransformZlib:
- writeCloser := zlibDefaultLevelWriterPool.get()
- writeCloser.Reset(tw.Writer)
- tw.Writer = writeCloser
- tw.closers = append(tw.closers, &zlibPoolCloser{
- writer: writeCloser,
- })
+ writer, closer, err := newZlibWriterCloserLevel(tw.Writer, zlib.DefaultCompression)
+ if err != nil {
+ return err
+ }
+ tw.Writer = writer
+ tw.closers = append(tw.closers, closer)
}
return nil
}
diff --git a/lib/go/thrift/pool.go b/lib/go/thrift/pool.go
index 1d623d4..6912f3e 100644
--- a/lib/go/thrift/pool.go
+++ b/lib/go/thrift/pool.go
@@ -43,7 +43,7 @@
}
return &pool[T]{
pool: sync.Pool{
- New: func() interface{} {
+ New: func() any {
return generate()
},
},
diff --git a/lib/go/thrift/zlib_pool.go b/lib/go/thrift/zlib_pool.go
new file mode 100644
index 0000000..b419291
--- /dev/null
+++ b/lib/go/thrift/zlib_pool.go
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package thrift
+
+import (
+ "compress/zlib"
+ "io"
+ "sync"
+)
+
+type zlibReader interface {
+ io.ReadCloser
+ zlib.Resetter
+}
+
+var zlibReaderPool sync.Pool
+
+func newZlibReader(r io.Reader) (io.ReadCloser, error) {
+ if reader, _ := zlibReaderPool.Get().(*wrappedZlibReader); reader != nil {
+ if err := reader.Reset(r, nil); err == nil {
+ return reader, nil
+ }
+ }
+ reader, err := zlib.NewReader(r)
+ if err != nil {
+ return nil, err
+ }
+ return &wrappedZlibReader{reader.(zlibReader)}, nil
+}
+
+type wrappedZlibReader struct {
+ zlibReader
+}
+
+func (wr *wrappedZlibReader) Close() error {
+ defer func() {
+ zlibReaderPool.Put(wr)
+ }()
+ return wr.zlibReader.Close()
+}
+
+func newZlibWriterLevelMust(level int) *zlib.Writer {
+ w, err := zlib.NewWriterLevel(nil, level)
+ if err != nil {
+ panic(err)
+ }
+ return w
+}
+
+// level -> pool
+var zlibWriterPools map[int]*pool[zlib.Writer] = func() map[int]*pool[zlib.Writer] {
+ m := make(map[int]*pool[zlib.Writer])
+ for level := zlib.HuffmanOnly; level <= zlib.BestCompression; level++ {
+ // force a panic at init if we have an invalid level here
+ newZlibWriterLevelMust(level)
+ m[level] = newPool(
+ func() *zlib.Writer {
+ return newZlibWriterLevelMust(level)
+ },
+ nil,
+ )
+ }
+ return m
+}()
+
+type zlibWriterPoolCloser struct {
+ writer *zlib.Writer
+ pool *pool[zlib.Writer]
+}
+
+func (z *zlibWriterPoolCloser) Close() error {
+ defer func() {
+ z.writer.Reset(nil)
+ z.pool.put(&z.writer)
+ }()
+ return z.writer.Close()
+}
+
+func newZlibWriterCloserLevel(w io.Writer, level int) (*zlib.Writer, io.Closer, error) {
+ pool, ok := zlibWriterPools[level]
+ if !ok {
+ // not pooled
+ writer, err := zlib.NewWriterLevel(w, level)
+ if err != nil {
+ return nil, nil, err
+ }
+ return writer, writer, nil
+ }
+ writer := pool.get()
+ writer.Reset(w)
+ return writer, &zlibWriterPoolCloser{writer: writer, pool: pool}, nil
+}
diff --git a/lib/go/thrift/zlib_pool_test.go b/lib/go/thrift/zlib_pool_test.go
new file mode 100644
index 0000000..1b3f5d8
--- /dev/null
+++ b/lib/go/thrift/zlib_pool_test.go
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package thrift
+
+import (
+ "compress/zlib"
+ "fmt"
+ "maps"
+ "slices"
+ "testing"
+)
+
+func TestZlibWriterPools(t *testing.T) {
+ // make sure we have the writer pools created at the given levels
+ for _, level := range []int{
+ zlib.HuffmanOnly,
+ zlib.DefaultCompression,
+ zlib.NoCompression,
+ zlib.BestSpeed,
+ zlib.BestCompression,
+ } {
+ t.Run(fmt.Sprintf("%d", level), func(t *testing.T) {
+ _, ok := zlibWriterPools[level]
+ if !ok {
+ t.Errorf("level %d does not exist in the writer pools", level)
+ }
+ })
+ }
+ if t.Failed() {
+ levels := slices.Collect(maps.Keys(zlibWriterPools))
+ slices.Sort(levels)
+ t.Log("zlib writer pools:", levels)
+ }
+}
diff --git a/lib/go/thrift/zlib_transport.go b/lib/go/thrift/zlib_transport.go
index cefe1f9..8f2d073 100644
--- a/lib/go/thrift/zlib_transport.go
+++ b/lib/go/thrift/zlib_transport.go
@@ -33,9 +33,10 @@
// TZlibTransport is a TTransport implementation that makes use of zlib compression.
type TZlibTransport struct {
- reader io.ReadCloser
- transport TTransport
- writer *zlib.Writer
+ reader io.ReadCloser
+ transport TTransport
+ writer *zlib.Writer
+ writeCloser io.Closer
}
// GetTransport constructs a new instance of NewTZlibTransport
@@ -64,14 +65,14 @@
// NewTZlibTransport constructs a new instance of TZlibTransport
func NewTZlibTransport(trans TTransport, level int) (*TZlibTransport, error) {
- w, err := zlib.NewWriterLevel(trans, level)
+ writer, closer, err := newZlibWriterCloserLevel(trans, level)
if err != nil {
return nil, err
}
-
return &TZlibTransport{
- writer: w,
- transport: trans,
+ writer: writer,
+ writeCloser: closer,
+ transport: trans,
}, nil
}
@@ -83,7 +84,7 @@
return err
}
}
- if err := z.writer.Close(); err != nil {
+ if err := z.writeCloser.Close(); err != nil {
return err
}
return z.transport.Close()
@@ -109,7 +110,7 @@
func (z *TZlibTransport) Read(p []byte) (int, error) {
if z.reader == nil {
- r, err := zlib.NewReader(z.transport)
+ r, err := newZlibReader(z.transport)
if err != nil {
return 0, NewTTransportExceptionFromError(err)
}