THRIFT-4448: Golang: do something with context.Context. Remove Go1.6 compatibility.
Client: go
This closes #1459
diff --git a/lib/go/Makefile.am b/lib/go/Makefile.am
index 0d59710..0dfa5fa 100644
--- a/lib/go/Makefile.am
+++ b/lib/go/Makefile.am
@@ -31,14 +31,12 @@
@echo '##############################################################'
check-local:
- GOPATH=`pwd` $(GO) get golang.org/x/net/context
GOPATH=`pwd` $(GO) test -race ./thrift
clean-local:
$(RM) -rf pkg
all-local:
- GOPATH=`pwd` $(GO) get golang.org/x/net/context
GOPATH=`pwd` $(GO) build ./thrift
EXTRA_DIST = \
diff --git a/lib/go/README.md b/lib/go/README.md
index 7440474..debc9ac 100644
--- a/lib/go/README.md
+++ b/lib/go/README.md
@@ -24,6 +24,8 @@
Using Thrift with Go
====================
+Thrift supports Go 1.7+
+
In following Go conventions, we recommend you use the 'go' tool to install
Thrift for go.
diff --git a/lib/go/test/Makefile.am b/lib/go/test/Makefile.am
index 842f2de..e93ec5c 100644
--- a/lib/go/test/Makefile.am
+++ b/lib/go/test/Makefile.am
@@ -17,10 +17,6 @@
# under the License.
#
-if GOVERSION_LT_17
-COMPILER_EXTRAFLAG=",legacy_context"
-endif
-
THRIFTARGS = -out gopath/src/ --gen go:thrift_import=thrift$(COMPILER_EXTRAFLAG)
THRIFTTEST = $(top_srcdir)/test/ThriftTest.thrift
@@ -59,7 +55,6 @@
$(THRIFT) $(THRIFTARGS) InitialismsTest.thrift
$(THRIFT) $(THRIFTARGS),read_write_private DontExportRWTest.thrift
$(THRIFT) $(THRIFTARGS),ignore_initialisms IgnoreInitialismsTest.thrift
- GOPATH=`pwd`/gopath $(GO) get golang.org/x/net/context
GOPATH=`pwd`/gopath $(GO) get github.com/golang/mock/gomock || true
sed -i 's/\"context\"/\"golang.org\/x\/net\/context\"/g' gopath/src/github.com/golang/mock/gomock/controller.go || true
GOPATH=`pwd`/gopath $(GO) get github.com/golang/mock/gomock
diff --git a/lib/go/test/tests/client_error_test.go b/lib/go/test/tests/client_error_test.go
index 5dec472..fdec4ea 100644
--- a/lib/go/test/tests/client_error_test.go
+++ b/lib/go/test/tests/client_error_test.go
@@ -20,6 +20,7 @@
package tests
import (
+ "context"
"errors"
"errortest"
"testing"
@@ -212,7 +213,7 @@
if failAt == 25 {
err = failWith
}
- last = protocol.EXPECT().Flush().Return(err).After(last)
+ last = protocol.EXPECT().Flush(context.Background()).Return(err).After(last)
if failAt == 25 {
return true
}
@@ -536,7 +537,7 @@
last = protocol.EXPECT().WriteFieldStop().After(last)
last = protocol.EXPECT().WriteStructEnd().After(last)
last = protocol.EXPECT().WriteMessageEnd().After(last)
- last = protocol.EXPECT().Flush().After(last)
+ last = protocol.EXPECT().Flush(context.Background()).After(last)
// Reading the exception, might fail.
if failAt == 0 {
@@ -704,7 +705,7 @@
protocol.EXPECT().WriteFieldStop(),
protocol.EXPECT().WriteStructEnd(),
protocol.EXPECT().WriteMessageEnd(),
- protocol.EXPECT().Flush(),
+ protocol.EXPECT().Flush(context.Background()),
protocol.EXPECT().ReadMessageBegin().Return("testString", thrift.REPLY, int32(2), nil),
)
@@ -735,7 +736,7 @@
protocol.EXPECT().WriteFieldStop(),
protocol.EXPECT().WriteStructEnd(),
protocol.EXPECT().WriteMessageEnd(),
- protocol.EXPECT().Flush(),
+ protocol.EXPECT().Flush(context.Background()),
protocol.EXPECT().ReadMessageBegin().Return("testString", thrift.REPLY, int32(2), nil),
)
@@ -764,7 +765,7 @@
protocol.EXPECT().WriteFieldStop(),
protocol.EXPECT().WriteStructEnd(),
protocol.EXPECT().WriteMessageEnd(),
- protocol.EXPECT().Flush(),
+ protocol.EXPECT().Flush(context.Background()),
protocol.EXPECT().ReadMessageBegin().Return("unknown", thrift.REPLY, int32(1), nil),
)
@@ -795,7 +796,7 @@
protocol.EXPECT().WriteFieldStop(),
protocol.EXPECT().WriteStructEnd(),
protocol.EXPECT().WriteMessageEnd(),
- protocol.EXPECT().Flush(),
+ protocol.EXPECT().Flush(context.Background()),
protocol.EXPECT().ReadMessageBegin().Return("unknown", thrift.REPLY, int32(1), nil),
)
@@ -824,7 +825,7 @@
protocol.EXPECT().WriteFieldStop(),
protocol.EXPECT().WriteStructEnd(),
protocol.EXPECT().WriteMessageEnd(),
- protocol.EXPECT().Flush(),
+ protocol.EXPECT().Flush(context.Background()),
protocol.EXPECT().ReadMessageBegin().Return("testString", thrift.INVALID_TMESSAGE_TYPE, int32(1), nil),
)
@@ -855,7 +856,7 @@
protocol.EXPECT().WriteFieldStop(),
protocol.EXPECT().WriteStructEnd(),
protocol.EXPECT().WriteMessageEnd(),
- protocol.EXPECT().Flush(),
+ protocol.EXPECT().Flush(context.Background()),
protocol.EXPECT().ReadMessageBegin().Return("testString", thrift.INVALID_TMESSAGE_TYPE, int32(1), nil),
)
diff --git a/lib/go/thrift/go17.go b/lib/go/test/tests/context.go
similarity index 93%
copy from lib/go/thrift/go17.go
copy to lib/go/test/tests/context.go
index e3b21c4..a93a82b 100644
--- a/lib/go/thrift/go17.go
+++ b/lib/go/test/tests/context.go
@@ -1,5 +1,3 @@
-// +build go1.7
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,8 +17,10 @@
* under the License.
*/
-package thrift
+package tests
-import "context"
+import (
+ "context"
+)
var defaultCtx = context.Background()
diff --git a/lib/go/test/tests/go17.go b/lib/go/test/tests/go17.go
deleted file mode 100644
index dc3c9d5..0000000
--- a/lib/go/test/tests/go17.go
+++ /dev/null
@@ -1,47 +0,0 @@
-// +build go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package tests
-
-import (
- "context"
- "fmt"
-)
-
-var defaultCtx = context.Background()
-
-type FirstImpl struct{}
-
-func (f *FirstImpl) ReturnOne(ctx context.Context) (r int64, err error) {
- return 1, nil
-}
-
-type SecondImpl struct{}
-
-func (s *SecondImpl) ReturnTwo(ctx context.Context) (r int64, err error) {
- return 2, nil
-}
-
-type impl struct{}
-
-func (i *impl) Hi(ctx context.Context, in int64, s string) (err error) { fmt.Println("Hi!"); return }
-func (i *impl) Emptyfunc(ctx context.Context) (err error) { return }
-func (i *impl) EchoInt(ctx context.Context, param int64) (r int64, err error) { return param, nil }
diff --git a/lib/go/test/tests/multiplexed_protocol_test.go b/lib/go/test/tests/multiplexed_protocol_test.go
index 0b5896b..61ac628 100644
--- a/lib/go/test/tests/multiplexed_protocol_test.go
+++ b/lib/go/test/tests/multiplexed_protocol_test.go
@@ -20,6 +20,7 @@
package tests
import (
+ "context"
"multiplexedprotocoltest"
"net"
"testing"
@@ -36,6 +37,18 @@
}
}
+type FirstImpl struct{}
+
+func (f *FirstImpl) ReturnOne(ctx context.Context) (r int64, err error) {
+ return 1, nil
+}
+
+type SecondImpl struct{}
+
+func (s *SecondImpl) ReturnTwo(ctx context.Context) (r int64, err error) {
+ return 2, nil
+}
+
func createTransport(addr net.Addr) (thrift.TTransport, error) {
socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT)
transport := thrift.NewTFramedTransport(socket)
diff --git a/lib/go/test/tests/one_way_test.go b/lib/go/test/tests/one_way_test.go
index 8abd671..48d0bbe 100644
--- a/lib/go/test/tests/one_way_test.go
+++ b/lib/go/test/tests/one_way_test.go
@@ -20,6 +20,8 @@
package tests
import (
+ "context"
+ "fmt"
"net"
"onewaytest"
"testing"
@@ -36,6 +38,12 @@
}
}
+type impl struct{}
+
+func (i *impl) Hi(ctx context.Context, in int64, s string) (err error) { fmt.Println("Hi!"); return }
+func (i *impl) Emptyfunc(ctx context.Context) (err error) { return }
+func (i *impl) EchoInt(ctx context.Context, param int64) (r int64, err error) { return param, nil }
+
const TIMEOUT = time.Second
var addr net.Addr
diff --git a/lib/go/test/tests/pre_go17.go b/lib/go/test/tests/pre_go17.go
deleted file mode 100644
index 8ab4331..0000000
--- a/lib/go/test/tests/pre_go17.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// +build !go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package tests
-
-import (
- "fmt"
-
- "golang.org/x/net/context"
-)
-
-var defaultCtx = context.Background()
-
-type FirstImpl struct{}
-
-func (f *FirstImpl) ReturnOne(ctx context.Context) (r int64, err error) {
- return 1, nil
-}
-
-type SecondImpl struct{}
-
-func (s *SecondImpl) ReturnTwo(ctx context.Context) (r int64, err error) {
- return 2, nil
-}
-
-type impl struct{}
-
-func (i *impl) Hi(ctx context.Context, in int64, s string) (err error) { fmt.Println("Hi!"); return }
-func (i *impl) Emptyfunc(ctx context.Context) (err error) { return }
-func (i *impl) EchoInt(ctx context.Context, param int64) (r int64, err error) { return param, nil }
diff --git a/lib/go/test/tests/protocol_mock.go b/lib/go/test/tests/protocol_mock.go
index 8476c86..51d7a02 100644
--- a/lib/go/test/tests/protocol_mock.go
+++ b/lib/go/test/tests/protocol_mock.go
@@ -23,6 +23,7 @@
package tests
import (
+ "context"
thrift "thrift"
gomock "github.com/golang/mock/gomock"
@@ -49,13 +50,13 @@
return _m.recorder
}
-func (_m *MockTProtocol) Flush() error {
+func (_m *MockTProtocol) Flush(ctx context.Context) error {
ret := _m.ctrl.Call(_m, "Flush")
ret0, _ := ret[0].(error)
return ret0
}
-func (_mr *_MockTProtocolRecorder) Flush() *gomock.Call {
+func (_mr *_MockTProtocolRecorder) Flush(ctx context.Context) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Flush")
}
diff --git a/lib/go/test/tests/thrifttest_handler.go b/lib/go/test/tests/thrifttest_handler.go
index 6542fac..31b9ee2 100644
--- a/lib/go/test/tests/thrifttest_handler.go
+++ b/lib/go/test/tests/thrifttest_handler.go
@@ -1,5 +1,3 @@
-// +build !go1.7
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -22,12 +20,11 @@
package tests
import (
+ "context"
"errors"
"thrift"
"thrifttest"
"time"
-
- "golang.org/x/net/context"
)
type SecondServiceHandler struct {
diff --git a/lib/go/test/tests/thrifttest_handler_go17.go b/lib/go/test/tests/thrifttest_handler_go17.go
deleted file mode 100644
index e022a3d..0000000
--- a/lib/go/test/tests/thrifttest_handler_go17.go
+++ /dev/null
@@ -1,212 +0,0 @@
-// +build go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * 'License'); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package tests
-
-import (
- "context"
- "errors"
- "thrift"
- "thrifttest"
- "time"
-)
-
-type SecondServiceHandler struct {
-}
-
-func NewSecondServiceHandler() *SecondServiceHandler {
- return &SecondServiceHandler{}
-}
-
-func (p *SecondServiceHandler) BlahBlah(ctx context.Context) (err error) {
- return nil
-}
-
-func (p *SecondServiceHandler) SecondtestString(ctx context.Context, thing string) (r string, err error) {
- return thing, nil
-}
-
-type ThriftTestHandler struct {
-}
-
-func NewThriftTestHandler() *ThriftTestHandler {
- return &ThriftTestHandler{}
-}
-
-func (p *ThriftTestHandler) TestVoid(ctx context.Context) (err error) {
- return nil
-}
-
-func (p *ThriftTestHandler) TestString(ctx context.Context, thing string) (r string, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestBool(ctx context.Context, thing bool) (r bool, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestByte(ctx context.Context, thing int8) (r int8, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestI32(ctx context.Context, thing int32) (r int32, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestI64(ctx context.Context, thing int64) (r int64, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestDouble(ctx context.Context, thing float64) (r float64, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestBinary(ctx context.Context, thing []byte) (r []byte, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestStruct(ctx context.Context, thing *thrifttest.Xtruct) (r *thrifttest.Xtruct, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestNest(ctx context.Context, thing *thrifttest.Xtruct2) (r *thrifttest.Xtruct2, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestMap(ctx context.Context, thing map[int32]int32) (r map[int32]int32, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestStringMap(ctx context.Context, thing map[string]string) (r map[string]string, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestSet(ctx context.Context, thing []int32) (r []int32, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestList(ctx context.Context, thing []int32) (r []int32, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestEnum(ctx context.Context, thing thrifttest.Numberz) (r thrifttest.Numberz, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestTypedef(ctx context.Context, thing thrifttest.UserId) (r thrifttest.UserId, err error) {
- return thing, nil
-}
-
-func (p *ThriftTestHandler) TestMapMap(ctx context.Context, hello int32) (r map[int32]map[int32]int32, err error) {
- r = make(map[int32]map[int32]int32)
- pos := make(map[int32]int32)
- neg := make(map[int32]int32)
-
- for i := int32(1); i < 5; i++ {
- pos[i] = i
- neg[-i] = -i
- }
- r[4] = pos
- r[-4] = neg
-
- return r, nil
-}
-
-func (p *ThriftTestHandler) TestInsanity(ctx context.Context, argument *thrifttest.Insanity) (r map[thrifttest.UserId]map[thrifttest.Numberz]*thrifttest.Insanity, err error) {
- hello := thrifttest.NewXtruct()
- hello.StringThing = "Hello2"
- hello.ByteThing = 2
- hello.I32Thing = 2
- hello.I64Thing = 2
-
- goodbye := thrifttest.NewXtruct()
- goodbye.StringThing = "Goodbye4"
- goodbye.ByteThing = 4
- goodbye.I32Thing = 4
- goodbye.I64Thing = 4
-
- crazy := thrifttest.NewInsanity()
- crazy.UserMap = make(map[thrifttest.Numberz]thrifttest.UserId)
- crazy.UserMap[thrifttest.Numberz_EIGHT] = 8
- crazy.UserMap[thrifttest.Numberz_FIVE] = 5
- crazy.Xtructs = []*thrifttest.Xtruct{goodbye, hello}
-
- first_map := make(map[thrifttest.Numberz]*thrifttest.Insanity)
- second_map := make(map[thrifttest.Numberz]*thrifttest.Insanity)
-
- first_map[thrifttest.Numberz_TWO] = crazy
- first_map[thrifttest.Numberz_THREE] = crazy
-
- looney := thrifttest.NewInsanity()
- second_map[thrifttest.Numberz_SIX] = looney
-
- var insane = make(map[thrifttest.UserId]map[thrifttest.Numberz]*thrifttest.Insanity)
- insane[1] = first_map
- insane[2] = second_map
-
- return insane, nil
-}
-
-func (p *ThriftTestHandler) TestMulti(ctx context.Context, arg0 int8, arg1 int32, arg2 int64, arg3 map[int16]string, arg4 thrifttest.Numberz, arg5 thrifttest.UserId) (r *thrifttest.Xtruct, err error) {
- r = thrifttest.NewXtruct()
- r.StringThing = "Hello2"
- r.ByteThing = arg0
- r.I32Thing = arg1
- r.I64Thing = arg2
- return r, nil
-}
-
-func (p *ThriftTestHandler) TestException(ctx context.Context, arg string) (err error) {
- if arg == "Xception" {
- x := thrifttest.NewXception()
- x.ErrorCode = 1001
- x.Message = arg
- return x
- } else if arg == "TException" {
- return thrift.TException(errors.New(arg))
- } else {
- return nil
- }
-}
-
-func (p *ThriftTestHandler) TestMultiException(ctx context.Context, arg0 string, arg1 string) (r *thrifttest.Xtruct, err error) {
- if arg0 == "Xception" {
- x := thrifttest.NewXception()
- x.ErrorCode = 1001
- x.Message = "This is an Xception"
- return nil, x
- } else if arg0 == "Xception2" {
- x2 := thrifttest.NewXception2()
- x2.ErrorCode = 2002
- x2.StructThing = thrifttest.NewXtruct()
- x2.StructThing.StringThing = "This is an Xception2"
- return nil, x2
- }
-
- res := thrifttest.NewXtruct()
- res.StringThing = arg1
- return res, nil
-}
-
-func (p *ThriftTestHandler) TestOneway(ctx context.Context, secondsToSleep int32) (err error) {
- time.Sleep(time.Second * time.Duration(secondsToSleep))
- return nil
-}
diff --git a/lib/go/thrift/binary_protocol.go b/lib/go/thrift/binary_protocol.go
index 690d341..de0f6a7 100644
--- a/lib/go/thrift/binary_protocol.go
+++ b/lib/go/thrift/binary_protocol.go
@@ -21,6 +21,7 @@
import (
"bytes"
+ "context"
"encoding/binary"
"errors"
"fmt"
@@ -457,8 +458,8 @@
return buf, NewTProtocolException(err)
}
-func (p *TBinaryProtocol) Flush() (err error) {
- return NewTProtocolException(p.trans.Flush())
+func (p *TBinaryProtocol) Flush(ctx context.Context) (err error) {
+ return NewTProtocolException(p.trans.Flush(ctx))
}
func (p *TBinaryProtocol) Skip(fieldType TType) (err error) {
diff --git a/lib/go/thrift/buffered_transport.go b/lib/go/thrift/buffered_transport.go
index b754f92..9670206 100644
--- a/lib/go/thrift/buffered_transport.go
+++ b/lib/go/thrift/buffered_transport.go
@@ -21,6 +21,7 @@
import (
"bufio"
+ "context"
)
type TBufferedTransportFactory struct {
@@ -78,12 +79,12 @@
return n, err
}
-func (p *TBufferedTransport) Flush() error {
+func (p *TBufferedTransport) Flush(ctx context.Context) error {
if err := p.ReadWriter.Flush(); err != nil {
p.ReadWriter.Writer.Reset(p.tp)
return err
}
- return p.tp.Flush()
+ return p.tp.Flush(ctx)
}
func (p *TBufferedTransport) RemainingBytes() (num_bytes uint64) {
diff --git a/lib/go/thrift/client.go b/lib/go/thrift/client.go
index 8bdb53d..28791cc 100644
--- a/lib/go/thrift/client.go
+++ b/lib/go/thrift/client.go
@@ -1,6 +1,13 @@
package thrift
-import "fmt"
+import (
+ "context"
+ "fmt"
+)
+
+type TClient interface {
+ Call(ctx context.Context, method string, args, result TStruct) error
+}
type TStandardClient struct {
seqId int32
@@ -16,7 +23,7 @@
}
}
-func (p *TStandardClient) Send(oprot TProtocol, seqId int32, method string, args TStruct) error {
+func (p *TStandardClient) Send(ctx context.Context, oprot TProtocol, seqId int32, method string, args TStruct) error {
if err := oprot.WriteMessageBegin(method, CALL, seqId); err != nil {
return err
}
@@ -26,7 +33,7 @@
if err := oprot.WriteMessageEnd(); err != nil {
return err
}
- return oprot.Flush()
+ return oprot.Flush(ctx)
}
func (p *TStandardClient) Recv(iprot TProtocol, seqId int32, method string, result TStruct) error {
@@ -61,11 +68,11 @@
return iprot.ReadMessageEnd()
}
-func (p *TStandardClient) call(method string, args, result TStruct) error {
+func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) error {
p.seqId++
seqId := p.seqId
- if err := p.Send(p.oprot, seqId, method, args); err != nil {
+ if err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {
return err
}
diff --git a/lib/go/thrift/client_go17.go b/lib/go/thrift/client_go17.go
deleted file mode 100644
index 15c1c52..0000000
--- a/lib/go/thrift/client_go17.go
+++ /dev/null
@@ -1,13 +0,0 @@
-// +build go1.7
-
-package thrift
-
-import "context"
-
-type TClient interface {
- Call(ctx context.Context, method string, args, result TStruct) error
-}
-
-func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) error {
- return p.call(method, args, result)
-}
diff --git a/lib/go/thrift/client_pre_go17.go b/lib/go/thrift/client_pre_go17.go
deleted file mode 100644
index d2e99ef..0000000
--- a/lib/go/thrift/client_pre_go17.go
+++ /dev/null
@@ -1,13 +0,0 @@
-// +build !go1.7
-
-package thrift
-
-import "golang.org/x/net/context"
-
-type TClient interface {
- Call(ctx context.Context, method string, args, result TStruct) error
-}
-
-func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) error {
- return p.call(method, args, result)
-}
diff --git a/lib/go/thrift/common_test_go17.go b/lib/go/thrift/common_test.go
similarity index 97%
rename from lib/go/thrift/common_test_go17.go
rename to lib/go/thrift/common_test.go
index 2c729a2..93597ff 100644
--- a/lib/go/thrift/common_test_go17.go
+++ b/lib/go/thrift/common_test.go
@@ -1,5 +1,3 @@
-// +build go1.7
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
diff --git a/lib/go/thrift/common_test_pre_go17.go b/lib/go/thrift/common_test_pre_go17.go
deleted file mode 100644
index e6d0c4d..0000000
--- a/lib/go/thrift/common_test_pre_go17.go
+++ /dev/null
@@ -1,32 +0,0 @@
-// +build !go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import "golang.org/x/net/context"
-
-type mockProcessor struct {
- ProcessFunc func(in, out TProtocol) (bool, TException)
-}
-
-func (m *mockProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
- return m.ProcessFunc(in, out)
-}
diff --git a/lib/go/thrift/compact_protocol.go b/lib/go/thrift/compact_protocol.go
index 0bc5fdd..fc1d182 100644
--- a/lib/go/thrift/compact_protocol.go
+++ b/lib/go/thrift/compact_protocol.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"encoding/binary"
"fmt"
"io"
@@ -599,8 +600,8 @@
return buf, NewTProtocolException(e)
}
-func (p *TCompactProtocol) Flush() (err error) {
- return NewTProtocolException(p.trans.Flush())
+func (p *TCompactProtocol) Flush(ctx context.Context) (err error) {
+ return NewTProtocolException(p.trans.Flush(ctx))
}
func (p *TCompactProtocol) Skip(fieldType TType) (err error) {
diff --git a/lib/go/thrift/go17.go b/lib/go/thrift/context.go
similarity index 97%
rename from lib/go/thrift/go17.go
rename to lib/go/thrift/context.go
index e3b21c4..d15c1bc 100644
--- a/lib/go/thrift/go17.go
+++ b/lib/go/thrift/context.go
@@ -1,5 +1,3 @@
-// +build go1.7
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
diff --git a/lib/go/thrift/debug_protocol.go b/lib/go/thrift/debug_protocol.go
index d37252c..57943e0 100644
--- a/lib/go/thrift/debug_protocol.go
+++ b/lib/go/thrift/debug_protocol.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"log"
)
@@ -258,8 +259,8 @@
log.Printf("%sSkip(fieldType=%#v) (err=%#v)", tdp.LogPrefix, fieldType, err)
return
}
-func (tdp *TDebugProtocol) Flush() (err error) {
- err = tdp.Delegate.Flush()
+func (tdp *TDebugProtocol) Flush(ctx context.Context) (err error) {
+ err = tdp.Delegate.Flush(ctx)
log.Printf("%sFlush() (err=%#v)", tdp.LogPrefix, err)
return
}
diff --git a/lib/go/thrift/framed_transport.go b/lib/go/thrift/framed_transport.go
index 60b1249..81fa65a 100644
--- a/lib/go/thrift/framed_transport.go
+++ b/lib/go/thrift/framed_transport.go
@@ -22,6 +22,7 @@
import (
"bufio"
"bytes"
+ "context"
"encoding/binary"
"fmt"
"io"
@@ -135,7 +136,7 @@
return p.buf.WriteString(s)
}
-func (p *TFramedTransport) Flush() error {
+func (p *TFramedTransport) Flush(ctx context.Context) error {
size := p.buf.Len()
buf := p.buffer[:4]
binary.BigEndian.PutUint32(buf, uint32(size))
@@ -151,7 +152,7 @@
return NewTTransportExceptionFromError(err)
}
}
- err = p.transport.Flush()
+ err = p.transport.Flush(ctx)
return NewTTransportExceptionFromError(err)
}
diff --git a/lib/go/thrift/http_client.go b/lib/go/thrift/http_client.go
index 33f2aa4..5c82bf5 100644
--- a/lib/go/thrift/http_client.go
+++ b/lib/go/thrift/http_client.go
@@ -21,6 +21,7 @@
import (
"bytes"
+ "context"
"io"
"io/ioutil"
"net/http"
@@ -181,7 +182,7 @@
return p.requestBuffer.WriteString(s)
}
-func (p *THttpClient) Flush() error {
+func (p *THttpClient) Flush(ctx context.Context) error {
// Close any previous response body to avoid leaking connections.
p.closeResponse()
@@ -190,6 +191,9 @@
return NewTTransportExceptionFromError(err)
}
req.Header = p.header
+ if ctx != nil {
+ req = req.WithContext(ctx)
+ }
response, err := p.client.Do(req)
if err != nil {
return NewTTransportExceptionFromError(err)
diff --git a/lib/go/thrift/http_transport.go b/lib/go/thrift/http_transport.go
index 601855b..66f0f38 100644
--- a/lib/go/thrift/http_transport.go
+++ b/lib/go/thrift/http_transport.go
@@ -26,6 +26,18 @@
"strings"
)
+// NewThriftHandlerFunc is a function that create a ready to use Apache Thrift Handler function
+func NewThriftHandlerFunc(processor TProcessor,
+ inPfactory, outPfactory TProtocolFactory) func(w http.ResponseWriter, r *http.Request) {
+
+ return gz(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Add("Content-Type", "application/x-thrift")
+
+ transport := NewStreamTransport(r.Body, w)
+ processor.Process(r.Context(), inPfactory.GetProtocol(transport), outPfactory.GetProtocol(transport))
+ })
+}
+
// gz transparently compresses the HTTP response if the client supports it.
func gz(handler http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
diff --git a/lib/go/thrift/http_transport_go17.go b/lib/go/thrift/http_transport_go17.go
deleted file mode 100644
index 1313ac2..0000000
--- a/lib/go/thrift/http_transport_go17.go
+++ /dev/null
@@ -1,38 +0,0 @@
-// +build go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import (
- "net/http"
-)
-
-// NewThriftHandlerFunc is a function that create a ready to use Apache Thrift Handler function
-func NewThriftHandlerFunc(processor TProcessor,
- inPfactory, outPfactory TProtocolFactory) func(w http.ResponseWriter, r *http.Request) {
-
- return gz(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "application/x-thrift")
-
- transport := NewStreamTransport(r.Body, w)
- processor.Process(r.Context(), inPfactory.GetProtocol(transport), outPfactory.GetProtocol(transport))
- })
-}
diff --git a/lib/go/thrift/http_transport_pre_go17.go b/lib/go/thrift/http_transport_pre_go17.go
deleted file mode 100644
index 13aa1c1..0000000
--- a/lib/go/thrift/http_transport_pre_go17.go
+++ /dev/null
@@ -1,40 +0,0 @@
-// +build !go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import (
- "net/http"
-
- "golang.org/x/net/context"
-)
-
-// NewThriftHandlerFunc is a function that create a ready to use Apache Thrift Handler function
-func NewThriftHandlerFunc(processor TProcessor,
- inPfactory, outPfactory TProtocolFactory) func(w http.ResponseWriter, r *http.Request) {
-
- return gz(func(w http.ResponseWriter, r *http.Request) {
- w.Header().Add("Content-Type", "application/x-thrift")
-
- transport := NewStreamTransport(r.Body, w)
- processor.Process(context.Background(), inPfactory.GetProtocol(transport), outPfactory.GetProtocol(transport))
- })
-}
diff --git a/lib/go/thrift/iostream_transport.go b/lib/go/thrift/iostream_transport.go
index b18be81..fea93bc 100644
--- a/lib/go/thrift/iostream_transport.go
+++ b/lib/go/thrift/iostream_transport.go
@@ -21,6 +21,7 @@
import (
"bufio"
+ "context"
"io"
)
@@ -138,7 +139,7 @@
}
// Flushes the underlying output stream if not null.
-func (p *StreamTransport) Flush() error {
+func (p *StreamTransport) Flush(ctx context.Context) error {
if p.Writer == nil {
return NewTTransportException(NOT_OPEN, "Cannot flush null outputStream")
}
diff --git a/lib/go/thrift/json_protocol.go b/lib/go/thrift/json_protocol.go
index 442fa91..7be685d 100644
--- a/lib/go/thrift/json_protocol.go
+++ b/lib/go/thrift/json_protocol.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"encoding/base64"
"fmt"
)
@@ -438,10 +439,10 @@
return v, p.ParsePostValue()
}
-func (p *TJSONProtocol) Flush() (err error) {
+func (p *TJSONProtocol) Flush(ctx context.Context) (err error) {
err = p.writer.Flush()
if err == nil {
- err = p.trans.Flush()
+ err = p.trans.Flush(ctx)
}
return NewTProtocolException(err)
}
diff --git a/lib/go/thrift/json_protocol_test.go b/lib/go/thrift/json_protocol_test.go
index 7104ce3..0902f1b 100644
--- a/lib/go/thrift/json_protocol_test.go
+++ b/lib/go/thrift/json_protocol_test.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"encoding/base64"
"encoding/json"
"fmt"
@@ -36,7 +37,7 @@
if e := p.WriteBool(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -68,7 +69,7 @@
} else {
trans.Write([]byte{'0'}) // not JSON_FALSE
}
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadBool()
if e != nil {
@@ -94,7 +95,7 @@
if e := p.WriteByte(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -116,7 +117,7 @@
trans := NewTMemoryBuffer()
p := NewTJSONProtocol(trans)
trans.WriteString(strconv.Itoa(int(value)))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadByte()
if e != nil {
@@ -141,7 +142,7 @@
if e := p.WriteI16(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -163,7 +164,7 @@
trans := NewTMemoryBuffer()
p := NewTJSONProtocol(trans)
trans.WriteString(strconv.Itoa(int(value)))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI16()
if e != nil {
@@ -188,7 +189,7 @@
if e := p.WriteI32(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -210,7 +211,7 @@
trans := NewTMemoryBuffer()
p := NewTJSONProtocol(trans)
trans.WriteString(strconv.Itoa(int(value)))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI32()
if e != nil {
@@ -235,7 +236,7 @@
if e := p.WriteI64(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -257,7 +258,7 @@
trans := NewTMemoryBuffer()
p := NewTJSONProtocol(trans)
trans.WriteString(strconv.FormatInt(value, 10))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI64()
if e != nil {
@@ -282,7 +283,7 @@
if e := p.WriteDouble(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -319,7 +320,7 @@
p := NewTJSONProtocol(trans)
n := NewNumericFromDouble(value)
trans.WriteString(n.String())
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadDouble()
if e != nil {
@@ -358,7 +359,7 @@
if e := p.WriteString(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -380,7 +381,7 @@
trans := NewTMemoryBuffer()
p := NewTJSONProtocol(trans)
trans.WriteString(jsonQuote(value))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadString()
if e != nil {
@@ -409,7 +410,7 @@
if e := p.WriteBinary(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -441,7 +442,7 @@
trans := NewTMemoryBuffer()
p := NewTJSONProtocol(trans)
trans.WriteString(jsonQuote(b64String))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadBinary()
if e != nil {
@@ -474,7 +475,7 @@
}
}
p.WriteListEnd()
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s due to error flushing: %s", thetype, e.Error())
}
str := trans.String()
@@ -528,7 +529,7 @@
}
}
p.WriteSetEnd()
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s due to error flushing: %s", thetype, e.Error())
}
str := trans.String()
@@ -585,7 +586,7 @@
}
}
p.WriteMapEnd()
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s due to error flushing: %s", thetype, e.Error())
}
str := trans.String()
diff --git a/lib/go/thrift/memory_buffer.go b/lib/go/thrift/memory_buffer.go
index 97a4edf..5936d27 100644
--- a/lib/go/thrift/memory_buffer.go
+++ b/lib/go/thrift/memory_buffer.go
@@ -21,6 +21,7 @@
import (
"bytes"
+ "context"
)
// Memory buffer-based implementation of the TTransport interface.
@@ -70,7 +71,7 @@
}
// Flushing a memory buffer is a no-op
-func (p *TMemoryBuffer) Flush() error {
+func (p *TMemoryBuffer) Flush(ctx context.Context) error {
return nil
}
diff --git a/lib/go/thrift/multiplexed_protocol.go b/lib/go/thrift/multiplexed_protocol.go
index b7f4f8a..d028a30 100644
--- a/lib/go/thrift/multiplexed_protocol.go
+++ b/lib/go/thrift/multiplexed_protocol.go
@@ -19,6 +19,12 @@
package thrift
+import (
+ "context"
+ "fmt"
+ "strings"
+)
+
/*
TMultiplexedProtocol is a protocol-independent concrete decorator
that allows a Thrift client to communicate with a multiplexing Thrift server,
@@ -122,6 +128,31 @@
t.serviceProcessorMap[name] = processor
}
+func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
+ name, typeId, seqid, err := in.ReadMessageBegin()
+ if err != nil {
+ return false, err
+ }
+ if typeId != CALL && typeId != ONEWAY {
+ return false, fmt.Errorf("Unexpected message type %v", typeId)
+ }
+ //extract the service name
+ v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
+ if len(v) != 2 {
+ if t.DefaultProcessor != nil {
+ smb := NewStoredMessageProtocol(in, name, typeId, seqid)
+ return t.DefaultProcessor.Process(ctx, smb, out)
+ }
+ return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
+ }
+ actualProcessor, ok := t.serviceProcessorMap[v[0]]
+ if !ok {
+ return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
+ }
+ smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
+ return actualProcessor.Process(ctx, smb, out)
+}
+
//Protocol that use stored message for ReadMessageBegin
type storedMessageProtocol struct {
TProtocol
diff --git a/lib/go/thrift/multiplexed_protocol_go17.go b/lib/go/thrift/multiplexed_protocol_go17.go
deleted file mode 100644
index c71035e..0000000
--- a/lib/go/thrift/multiplexed_protocol_go17.go
+++ /dev/null
@@ -1,53 +0,0 @@
-// +build go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import (
- "context"
- "fmt"
- "strings"
-)
-
-func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
- name, typeId, seqid, err := in.ReadMessageBegin()
- if err != nil {
- return false, err
- }
- if typeId != CALL && typeId != ONEWAY {
- return false, fmt.Errorf("Unexpected message type %v", typeId)
- }
- //extract the service name
- v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
- if len(v) != 2 {
- if t.DefaultProcessor != nil {
- smb := NewStoredMessageProtocol(in, name, typeId, seqid)
- return t.DefaultProcessor.Process(ctx, smb, out)
- }
- return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
- }
- actualProcessor, ok := t.serviceProcessorMap[v[0]]
- if !ok {
- return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
- }
- smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
- return actualProcessor.Process(ctx, smb, out)
-}
diff --git a/lib/go/thrift/multiplexed_protocol_pre_go17.go b/lib/go/thrift/multiplexed_protocol_pre_go17.go
deleted file mode 100644
index 5c27b38..0000000
--- a/lib/go/thrift/multiplexed_protocol_pre_go17.go
+++ /dev/null
@@ -1,54 +0,0 @@
-// +build !go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import (
- "fmt"
- "strings"
-
- "golang.org/x/net/context"
-)
-
-func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
- name, typeId, seqid, err := in.ReadMessageBegin()
- if err != nil {
- return false, err
- }
- if typeId != CALL && typeId != ONEWAY {
- return false, fmt.Errorf("Unexpected message type %v", typeId)
- }
- //extract the service name
- v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
- if len(v) != 2 {
- if t.DefaultProcessor != nil {
- smb := NewStoredMessageProtocol(in, name, typeId, seqid)
- return t.DefaultProcessor.Process(ctx, smb, out)
- }
- return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
- }
- actualProcessor, ok := t.serviceProcessorMap[v[0]]
- if !ok {
- return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
- }
- smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
- return actualProcessor.Process(ctx, smb, out)
-}
diff --git a/lib/go/thrift/pre_go17.go b/lib/go/thrift/pre_go17.go
deleted file mode 100644
index cb564b8..0000000
--- a/lib/go/thrift/pre_go17.go
+++ /dev/null
@@ -1,26 +0,0 @@
-// +build !go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import "golang.org/x/net/context"
-
-var defaultCtx = context.Background()
diff --git a/lib/go/thrift/processor.go b/lib/go/thrift/processor.go
deleted file mode 100644
index 566aaaf..0000000
--- a/lib/go/thrift/processor.go
+++ /dev/null
@@ -1,34 +0,0 @@
-// +build !go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import "golang.org/x/net/context"
-
-// A processor is a generic object which operates upon an input stream and
-// writes to some output stream.
-type TProcessor interface {
- Process(ctx context.Context, in, out TProtocol) (bool, TException)
-}
-
-type TProcessorFunction interface {
- Process(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException)
-}
diff --git a/lib/go/thrift/processor_factory.go b/lib/go/thrift/processor_factory.go
index 9d645df..e4b132b 100644
--- a/lib/go/thrift/processor_factory.go
+++ b/lib/go/thrift/processor_factory.go
@@ -19,6 +19,18 @@
package thrift
+import "context"
+
+// A processor is a generic object which operates upon an input stream and
+// writes to some output stream.
+type TProcessor interface {
+ Process(ctx context.Context, in, out TProtocol) (bool, TException)
+}
+
+type TProcessorFunction interface {
+ Process(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException)
+}
+
// The default processor factory just returns a singleton
// instance.
type TProcessorFactory interface {
diff --git a/lib/go/thrift/processor_go17.go b/lib/go/thrift/processor_go17.go
deleted file mode 100644
index fb0b165..0000000
--- a/lib/go/thrift/processor_go17.go
+++ /dev/null
@@ -1,34 +0,0 @@
-// +build go1.7
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package thrift
-
-import "context"
-
-// A processor is a generic object which operates upon an input stream and
-// writes to some output stream.
-type TProcessor interface {
- Process(ctx context.Context, in, out TProtocol) (bool, TException)
-}
-
-type TProcessorFunction interface {
- Process(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException)
-}
diff --git a/lib/go/thrift/protocol.go b/lib/go/thrift/protocol.go
index 25e6d24..615b7a4 100644
--- a/lib/go/thrift/protocol.go
+++ b/lib/go/thrift/protocol.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"errors"
"fmt"
)
@@ -74,7 +75,7 @@
ReadBinary() (value []byte, err error)
Skip(fieldType TType) (err error)
- Flush() (err error)
+ Flush(ctx context.Context) (err error)
Transport() TTransport
}
diff --git a/lib/go/thrift/protocol_test.go b/lib/go/thrift/protocol_test.go
index 2573312..2e3b65d 100644
--- a/lib/go/thrift/protocol_test.go
+++ b/lib/go/thrift/protocol_test.go
@@ -21,6 +21,7 @@
import (
"bytes"
+ "context"
"io/ioutil"
"math"
"net"
@@ -234,7 +235,7 @@
if err != nil {
t.Errorf("%s: %T %T %q Error writing list end: %q", "ReadWriteBool", p, trans, err, BOOL_VALUES)
}
- p.Flush()
+ p.Flush(context.Background())
thetype2, thelen2, err := p.ReadListBegin()
if err != nil {
t.Errorf("%s: %T %T %q Error reading list: %q", "ReadWriteBool", p, trans, err, BOOL_VALUES)
@@ -280,7 +281,7 @@
if err != nil {
t.Errorf("%s: %T %T %q Error writing list end: %q", "ReadWriteByte", p, trans, err, BYTE_VALUES)
}
- err = p.Flush()
+ err = p.Flush(context.Background())
if err != nil {
t.Errorf("%s: %T %T %q Error flushing list of bytes: %q", "ReadWriteByte", p, trans, err, BYTE_VALUES)
}
@@ -320,7 +321,7 @@
p.WriteI16(v)
}
p.WriteListEnd()
- p.Flush()
+ p.Flush(context.Background())
thetype2, thelen2, err := p.ReadListBegin()
if err != nil {
t.Errorf("%s: %T %T %q Error reading list: %q", "ReadWriteI16", p, trans, err, INT16_VALUES)
@@ -357,7 +358,7 @@
p.WriteI32(v)
}
p.WriteListEnd()
- p.Flush()
+ p.Flush(context.Background())
thetype2, thelen2, err := p.ReadListBegin()
if err != nil {
t.Errorf("%s: %T %T %q Error reading list: %q", "ReadWriteI32", p, trans, err, INT32_VALUES)
@@ -393,7 +394,7 @@
p.WriteI64(v)
}
p.WriteListEnd()
- p.Flush()
+ p.Flush(context.Background())
thetype2, thelen2, err := p.ReadListBegin()
if err != nil {
t.Errorf("%s: %T %T %q Error reading list: %q", "ReadWriteI64", p, trans, err, INT64_VALUES)
@@ -429,7 +430,7 @@
p.WriteDouble(v)
}
p.WriteListEnd()
- p.Flush()
+ p.Flush(context.Background())
thetype2, thelen2, err := p.ReadListBegin()
if err != nil {
t.Errorf("%s: %T %T %q Error reading list: %q", "ReadWriteDouble", p, trans, err, DOUBLE_VALUES)
@@ -467,7 +468,7 @@
p.WriteString(v)
}
p.WriteListEnd()
- p.Flush()
+ p.Flush(context.Background())
thetype2, thelen2, err := p.ReadListBegin()
if err != nil {
t.Errorf("%s: %T %T %q Error reading list: %q", "ReadWriteString", p, trans, err, STRING_VALUES)
@@ -498,7 +499,7 @@
func ReadWriteBinary(t testing.TB, p TProtocol, trans TTransport) {
v := protocol_bdata
p.WriteBinary(v)
- p.Flush()
+ p.Flush(context.Background())
value, err := p.ReadBinary()
if err != nil {
t.Errorf("%s: %T %T Unable to read binary: %s", "ReadWriteBinary", p, trans, err.Error())
diff --git a/lib/go/thrift/serializer.go b/lib/go/thrift/serializer.go
index 7712229..1ff4d37 100644
--- a/lib/go/thrift/serializer.go
+++ b/lib/go/thrift/serializer.go
@@ -19,6 +19,10 @@
package thrift
+import (
+ "context"
+)
+
type TSerializer struct {
Transport *TMemoryBuffer
Protocol TProtocol
@@ -38,35 +42,35 @@
protocol}
}
-func (t *TSerializer) WriteString(msg TStruct) (s string, err error) {
+func (t *TSerializer) WriteString(ctx context.Context, msg TStruct) (s string, err error) {
t.Transport.Reset()
if err = msg.Write(t.Protocol); err != nil {
return
}
- if err = t.Protocol.Flush(); err != nil {
+ if err = t.Protocol.Flush(ctx); err != nil {
return
}
- if err = t.Transport.Flush(); err != nil {
+ if err = t.Transport.Flush(ctx); err != nil {
return
}
return t.Transport.String(), nil
}
-func (t *TSerializer) Write(msg TStruct) (b []byte, err error) {
+func (t *TSerializer) Write(ctx context.Context, msg TStruct) (b []byte, err error) {
t.Transport.Reset()
if err = msg.Write(t.Protocol); err != nil {
return
}
- if err = t.Protocol.Flush(); err != nil {
+ if err = t.Protocol.Flush(ctx); err != nil {
return
}
- if err = t.Transport.Flush(); err != nil {
+ if err = t.Transport.Flush(ctx); err != nil {
return
}
diff --git a/lib/go/thrift/serializer_test.go b/lib/go/thrift/serializer_test.go
index 06d27a1..32227ef 100644
--- a/lib/go/thrift/serializer_test.go
+++ b/lib/go/thrift/serializer_test.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"errors"
"fmt"
"testing"
@@ -88,7 +89,7 @@
m.StringSet = make(map[string]struct{}, 5)
m.E = 2
- s, err := t.WriteString(&m)
+ s, err := t.WriteString(context.Background(), &m)
if err != nil {
return false, errors.New(fmt.Sprintf("Unable to Serialize struct\n\t %s", err))
}
@@ -122,7 +123,7 @@
m.StringSet = make(map[string]struct{}, 5)
m.E = 2
- s, err := t.WriteString(&m)
+ s, err := t.WriteString(context.Background(), &m)
if err != nil {
return false, errors.New(fmt.Sprintf("Unable to Serialize struct\n\t %s", err))
diff --git a/lib/go/thrift/simple_json_protocol.go b/lib/go/thrift/simple_json_protocol.go
index 7353322..88f569c 100644
--- a/lib/go/thrift/simple_json_protocol.go
+++ b/lib/go/thrift/simple_json_protocol.go
@@ -22,6 +22,7 @@
import (
"bufio"
"bytes"
+ "context"
"encoding/base64"
"encoding/json"
"fmt"
@@ -552,7 +553,7 @@
return v, p.ParsePostValue()
}
-func (p *TSimpleJSONProtocol) Flush() (err error) {
+func (p *TSimpleJSONProtocol) Flush(ctx context.Context) (err error) {
return NewTProtocolException(p.writer.Flush())
}
diff --git a/lib/go/thrift/simple_json_protocol_test.go b/lib/go/thrift/simple_json_protocol_test.go
index 8f0dcc9..49181ab 100644
--- a/lib/go/thrift/simple_json_protocol_test.go
+++ b/lib/go/thrift/simple_json_protocol_test.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"encoding/base64"
"encoding/json"
"fmt"
@@ -37,7 +38,7 @@
if e := p.WriteBool(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -63,7 +64,7 @@
} else {
trans.Write(JSON_FALSE)
}
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadBool()
if e != nil {
@@ -88,7 +89,7 @@
if e := p.WriteByte(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -110,7 +111,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(strconv.Itoa(int(value)))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadByte()
if e != nil {
@@ -135,7 +136,7 @@
if e := p.WriteI16(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -157,7 +158,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(strconv.Itoa(int(value)))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI16()
if e != nil {
@@ -182,7 +183,7 @@
if e := p.WriteI32(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -204,7 +205,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(strconv.Itoa(int(value)))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI32()
if e != nil {
@@ -228,7 +229,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(value)
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI32()
@@ -250,7 +251,7 @@
if e := p.WriteI64(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -272,7 +273,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(strconv.FormatInt(value, 10))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI64()
if e != nil {
@@ -296,7 +297,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(value)
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadI64()
@@ -318,7 +319,7 @@
if e := p.WriteDouble(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -355,7 +356,7 @@
p := NewTSimpleJSONProtocol(trans)
n := NewNumericFromDouble(value)
trans.WriteString(n.String())
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadDouble()
if e != nil {
@@ -394,7 +395,7 @@
if e := p.WriteString(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -416,7 +417,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(jsonQuote(value))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadString()
if e != nil {
@@ -440,7 +441,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(value)
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadString()
if e != nil {
@@ -464,7 +465,7 @@
if e := p.WriteBinary(value); e != nil {
t.Fatalf("Unable to write %s value %v due to error: %s", thetype, value, e.Error())
}
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s value %v due to error flushing: %s", thetype, value, e.Error())
}
s := trans.String()
@@ -487,7 +488,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(jsonQuote(b64String))
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
v, e := p.ReadBinary()
if e != nil {
@@ -516,7 +517,7 @@
trans := NewTMemoryBuffer()
p := NewTSimpleJSONProtocol(trans)
trans.WriteString(value)
- trans.Flush()
+ trans.Flush(context.Background())
s := trans.String()
b, e := p.ReadBinary()
v := string(b)
@@ -542,7 +543,7 @@
}
}
p.WriteListEnd()
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s due to error flushing: %s", thetype, e.Error())
}
str := trans.String()
@@ -596,7 +597,7 @@
}
}
p.WriteSetEnd()
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s due to error flushing: %s", thetype, e.Error())
}
str := trans.String()
@@ -653,7 +654,7 @@
}
}
p.WriteMapEnd()
- if e := p.Flush(); e != nil {
+ if e := p.Flush(context.Background()); e != nil {
t.Fatalf("Unable to write %s due to error flushing: %s", thetype, e.Error())
}
str := trans.String()
diff --git a/lib/go/thrift/socket.go b/lib/go/thrift/socket.go
index 383b1fe..8854279 100644
--- a/lib/go/thrift/socket.go
+++ b/lib/go/thrift/socket.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"net"
"time"
)
@@ -148,7 +149,7 @@
return p.conn.Write(buf)
}
-func (p *TSocket) Flush() error {
+func (p *TSocket) Flush(ctx context.Context) error {
return nil
}
diff --git a/lib/go/thrift/ssl_socket.go b/lib/go/thrift/ssl_socket.go
index c3bd72c..ba63377 100644
--- a/lib/go/thrift/ssl_socket.go
+++ b/lib/go/thrift/ssl_socket.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"crypto/tls"
"net"
"time"
@@ -158,7 +159,7 @@
return p.conn.Write(buf)
}
-func (p *TSSLSocket) Flush() error {
+func (p *TSSLSocket) Flush(ctx context.Context) error {
return nil
}
diff --git a/lib/go/thrift/transport.go b/lib/go/thrift/transport.go
index 70a85a8..ba2738a 100644
--- a/lib/go/thrift/transport.go
+++ b/lib/go/thrift/transport.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"errors"
"io"
)
@@ -30,6 +31,10 @@
Flush() (err error)
}
+type ContextFlusher interface {
+ Flush(ctx context.Context) (err error)
+}
+
type ReadSizeProvider interface {
RemainingBytes() (num_bytes uint64)
}
@@ -37,7 +42,7 @@
// Encapsulates the I/O layer
type TTransport interface {
io.ReadWriteCloser
- Flusher
+ ContextFlusher
ReadSizeProvider
// Opens the transport for communication
@@ -60,6 +65,6 @@
io.ByteReader
io.ByteWriter
stringWriter
- Flusher
+ ContextFlusher
ReadSizeProvider
}
diff --git a/lib/go/thrift/transport_test.go b/lib/go/thrift/transport_test.go
index 864958a..0127803 100644
--- a/lib/go/thrift/transport_test.go
+++ b/lib/go/thrift/transport_test.go
@@ -20,6 +20,7 @@
package thrift
import (
+ "context"
"io"
"net"
"strconv"
@@ -54,7 +55,7 @@
if err != nil {
t.Fatalf("Transport %T cannot write binary data of length %d: %s", writeTrans, len(transport_bdata), err)
}
- err = writeTrans.Flush()
+ err = writeTrans.Flush(context.Background())
if err != nil {
t.Fatalf("Transport %T cannot flush write of binary data: %s", writeTrans, err)
}
@@ -74,7 +75,7 @@
if err != nil {
t.Fatalf("Transport %T cannot write binary data 2 of length %d: %s", writeTrans, len(transport_bdata), err)
}
- err = writeTrans.Flush()
+ err = writeTrans.Flush(context.Background())
if err != nil {
t.Fatalf("Transport %T cannot flush write binary data 2: %s", writeTrans, err)
}
@@ -113,7 +114,7 @@
if err != nil {
t.Fatalf("Transport %T cannot write binary data of length %d: %s", writeTrans, len(transport_bdata), err)
}
- err = writeTrans.Flush()
+ err = writeTrans.Flush(context.Background())
if err != nil {
t.Fatalf("Transport %T cannot flush write of binary data: %s", writeTrans, err)
}
diff --git a/lib/go/thrift/zlib_transport.go b/lib/go/thrift/zlib_transport.go
index f2f0732..f3d4267 100644
--- a/lib/go/thrift/zlib_transport.go
+++ b/lib/go/thrift/zlib_transport.go
@@ -21,6 +21,7 @@
import (
"compress/zlib"
+ "context"
"io"
"log"
)
@@ -91,11 +92,11 @@
}
// Flush flushes the writer and its underlying transport.
-func (z *TZlibTransport) Flush() error {
+func (z *TZlibTransport) Flush(ctx context.Context) error {
if err := z.writer.Flush(); err != nil {
return err
}
- return z.transport.Flush()
+ return z.transport.Flush(ctx)
}
// IsOpen returns true if the transport is open