blob: 51e3509c180f254f1a9a551cc8efe13b28e7b7e3 [file] [log] [blame]
Jens Geyer0e87c462013-06-18 22:25:07 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
Yuxuan 'fishy' Wange4870a32019-10-24 13:23:30 -070023 "fmt"
Yuxuan 'fishy' Wangc03e2aa2019-10-23 13:43:09 -070024 "io"
Paul2df9c202016-09-24 22:47:58 +030025 "sync"
Zachary Wassermanc1794352017-06-29 17:15:01 -070026 "sync/atomic"
Jens Geyer0e87c462013-06-18 22:25:07 +020027)
28
James E. King, III847fae92017-03-22 15:17:30 -040029/*
30 * This is not a typical TSimpleServer as it is not blocked after accept a socket.
31 * It is more like a TThreadedServer that can handle different connections in different goroutines.
32 * This will work if golang user implements a conn-pool like thing in client side.
33 */
Jens Geyer0e87c462013-06-18 22:25:07 +020034type TSimpleServer struct {
Zachary Wassermanc1794352017-06-29 17:15:01 -070035 closed int32
36 wg sync.WaitGroup
37 mu sync.Mutex
Jens Geyer0e87c462013-06-18 22:25:07 +020038
39 processorFactory TProcessorFactory
40 serverTransport TServerTransport
41 inputTransportFactory TTransportFactory
42 outputTransportFactory TTransportFactory
43 inputProtocolFactory TProtocolFactory
44 outputProtocolFactory TProtocolFactory
Yuxuan 'fishy' Wang26ef9042019-08-19 00:18:22 -070045
46 // Headers to auto forward in THeaderProtocol
47 forwardHeaders []string
Yuxuan 'fishy' Wange4870a32019-10-24 13:23:30 -070048
49 logger Logger
Jens Geyer0e87c462013-06-18 22:25:07 +020050}
51
52func NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer {
53 return NewTSimpleServerFactory2(NewTProcessorFactory(processor), serverTransport)
54}
55
56func NewTSimpleServer4(processor TProcessor, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
57 return NewTSimpleServerFactory4(NewTProcessorFactory(processor),
58 serverTransport,
59 transportFactory,
60 protocolFactory,
61 )
62}
63
64func NewTSimpleServer6(processor TProcessor, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
65 return NewTSimpleServerFactory6(NewTProcessorFactory(processor),
66 serverTransport,
67 inputTransportFactory,
68 outputTransportFactory,
69 inputProtocolFactory,
70 outputProtocolFactory,
71 )
72}
73
74func NewTSimpleServerFactory2(processorFactory TProcessorFactory, serverTransport TServerTransport) *TSimpleServer {
75 return NewTSimpleServerFactory6(processorFactory,
76 serverTransport,
77 NewTTransportFactory(),
78 NewTTransportFactory(),
79 NewTBinaryProtocolFactoryDefault(),
80 NewTBinaryProtocolFactoryDefault(),
81 )
82}
83
84func NewTSimpleServerFactory4(processorFactory TProcessorFactory, serverTransport TServerTransport, transportFactory TTransportFactory, protocolFactory TProtocolFactory) *TSimpleServer {
85 return NewTSimpleServerFactory6(processorFactory,
86 serverTransport,
87 transportFactory,
88 transportFactory,
89 protocolFactory,
90 protocolFactory,
91 )
92}
93
94func NewTSimpleServerFactory6(processorFactory TProcessorFactory, serverTransport TServerTransport, inputTransportFactory TTransportFactory, outputTransportFactory TTransportFactory, inputProtocolFactory TProtocolFactory, outputProtocolFactory TProtocolFactory) *TSimpleServer {
Jens Geyerc975bbc2014-03-06 21:11:46 +010095 return &TSimpleServer{
96 processorFactory: processorFactory,
Jens Geyer0e87c462013-06-18 22:25:07 +020097 serverTransport: serverTransport,
98 inputTransportFactory: inputTransportFactory,
99 outputTransportFactory: outputTransportFactory,
100 inputProtocolFactory: inputProtocolFactory,
101 outputProtocolFactory: outputProtocolFactory,
102 }
103}
104
105func (p *TSimpleServer) ProcessorFactory() TProcessorFactory {
106 return p.processorFactory
107}
108
109func (p *TSimpleServer) ServerTransport() TServerTransport {
110 return p.serverTransport
111}
112
113func (p *TSimpleServer) InputTransportFactory() TTransportFactory {
114 return p.inputTransportFactory
115}
116
117func (p *TSimpleServer) OutputTransportFactory() TTransportFactory {
118 return p.outputTransportFactory
119}
120
121func (p *TSimpleServer) InputProtocolFactory() TProtocolFactory {
122 return p.inputProtocolFactory
123}
124
125func (p *TSimpleServer) OutputProtocolFactory() TProtocolFactory {
126 return p.outputProtocolFactory
127}
128
Jens Geyerf4598682014-05-08 23:18:44 +0200129func (p *TSimpleServer) Listen() error {
130 return p.serverTransport.Listen()
131}
Jens Geyerc975bbc2014-03-06 21:11:46 +0100132
Yuxuan 'fishy' Wang26ef9042019-08-19 00:18:22 -0700133// SetForwardHeaders sets the list of header keys that will be auto forwarded
134// while using THeaderProtocol.
135//
136// "forward" means that when the server is also a client to other upstream
137// thrift servers, the context object user gets in the processor functions will
138// have both read and write headers set, with write headers being forwarded.
139// Users can always override the write headers by calling SetWriteHeaderList
140// before calling thrift client functions.
141func (p *TSimpleServer) SetForwardHeaders(headers []string) {
142 size := len(headers)
143 if size == 0 {
144 p.forwardHeaders = nil
145 return
146 }
147
148 keys := make([]string, size)
149 copy(keys, headers)
150 p.forwardHeaders = keys
151}
152
Yuxuan 'fishy' Wange4870a32019-10-24 13:23:30 -0700153// SetLogger sets the logger used by this TSimpleServer.
154//
155// If no logger was set before Serve is called, a default logger using standard
156// log library will be used.
157func (p *TSimpleServer) SetLogger(logger Logger) {
158 p.logger = logger
159}
160
Matthew Pound8a83b042018-03-29 14:03:50 -0700161func (p *TSimpleServer) innerAccept() (int32, error) {
162 client, err := p.serverTransport.Accept()
163 p.mu.Lock()
164 defer p.mu.Unlock()
165 closed := atomic.LoadInt32(&p.closed)
166 if closed != 0 {
167 return closed, nil
168 }
169 if err != nil {
170 return 0, err
171 }
172 if client != nil {
173 p.wg.Add(1)
174 go func() {
175 defer p.wg.Done()
176 if err := p.processRequests(client); err != nil {
Yuxuan 'fishy' Wange4870a32019-10-24 13:23:30 -0700177 p.logger(fmt.Sprintf("error processing request: %v", err))
Matthew Pound8a83b042018-03-29 14:03:50 -0700178 }
179 }()
180 }
181 return 0, nil
182}
183
Jens Geyerf4598682014-05-08 23:18:44 +0200184func (p *TSimpleServer) AcceptLoop() error {
Jens Geyerc975bbc2014-03-06 21:11:46 +0100185 for {
Matthew Pound8a83b042018-03-29 14:03:50 -0700186 closed, err := p.innerAccept()
Jens Geyer0e87c462013-06-18 22:25:07 +0200187 if err != nil {
Jens Geyer57cd4212014-12-08 21:25:00 +0100188 return err
Jens Geyer0e87c462013-06-18 22:25:07 +0200189 }
Matthew Pound8a83b042018-03-29 14:03:50 -0700190 if closed != 0 {
191 return nil
Jens Geyer0e87c462013-06-18 22:25:07 +0200192 }
193 }
Jens Geyerf4598682014-05-08 23:18:44 +0200194}
195
196func (p *TSimpleServer) Serve() error {
Yuxuan 'fishy' Wange4870a32019-10-24 13:23:30 -0700197 p.logger = fallbackLogger(p.logger)
198
Jens Geyerf4598682014-05-08 23:18:44 +0200199 err := p.Listen()
200 if err != nil {
201 return err
202 }
203 p.AcceptLoop()
Jens Geyer0e87c462013-06-18 22:25:07 +0200204 return nil
205}
206
207func (p *TSimpleServer) Stop() error {
Zachary Wassermanc1794352017-06-29 17:15:01 -0700208 p.mu.Lock()
209 defer p.mu.Unlock()
210 if atomic.LoadInt32(&p.closed) != 0 {
211 return nil
ZhiyuYin47f9b9d2016-06-16 17:28:42 +0800212 }
Zachary Wassermanc1794352017-06-29 17:15:01 -0700213 atomic.StoreInt32(&p.closed, 1)
214 p.serverTransport.Interrupt()
215 p.wg.Wait()
Jens Geyer0e87c462013-06-18 22:25:07 +0200216 return nil
217}
218
Jens Geyer91cfb992014-05-17 01:07:28 +0200219func (p *TSimpleServer) processRequests(client TTransport) error {
Jens Geyer0e87c462013-06-18 22:25:07 +0200220 processor := p.processorFactory.GetProcessor(client)
D. Can Celasun8da0e722017-06-02 14:33:32 +0200221 inputTransport, err := p.inputTransportFactory.GetTransport(client)
222 if err != nil {
223 return err
224 }
Jens Geyer0e87c462013-06-18 22:25:07 +0200225 inputProtocol := p.inputProtocolFactory.GetProtocol(inputTransport)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800226 var outputTransport TTransport
227 var outputProtocol TProtocol
228
229 // for THeaderProtocol, we must use the same protocol instance for
230 // input and output so that the response is in the same dialect that
231 // the server detected the request was in.
Yuxuan 'fishy' Wangb1002a72019-08-05 13:03:02 -0700232 headerProtocol, ok := inputProtocol.(*THeaderProtocol)
233 if ok {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800234 outputProtocol = inputProtocol
235 } else {
236 oTrans, err := p.outputTransportFactory.GetTransport(client)
237 if err != nil {
238 return err
239 }
240 outputTransport = oTrans
241 outputProtocol = p.outputProtocolFactory.GetProtocol(outputTransport)
242 }
243
Jens Geyer0e87c462013-06-18 22:25:07 +0200244 if inputTransport != nil {
245 defer inputTransport.Close()
246 }
247 if outputTransport != nil {
248 defer outputTransport.Close()
249 }
250 for {
Zachary Wassermanc1794352017-06-29 17:15:01 -0700251 if atomic.LoadInt32(&p.closed) != 0 {
libinbina5768962017-05-18 14:18:28 +0800252 return nil
libinbina5768962017-05-18 14:18:28 +0800253 }
254
Yuxuan 'fishy' Wangb1002a72019-08-05 13:03:02 -0700255 ctx := defaultCtx
256 if headerProtocol != nil {
257 // We need to call ReadFrame here, otherwise we won't
258 // get any headers on the AddReadTHeaderToContext call.
259 //
260 // ReadFrame is safe to be called multiple times so it
261 // won't break when it's called again later when we
262 // actually start to read the message.
263 if err := headerProtocol.ReadFrame(); err != nil {
Yuxuan 'fishy' Wangc03e2aa2019-10-23 13:43:09 -0700264 if err == io.EOF {
265 return nil
266 }
Yuxuan 'fishy' Wangb1002a72019-08-05 13:03:02 -0700267 return err
268 }
Yuxuan 'fishy' Wangc03e2aa2019-10-23 13:43:09 -0700269 ctx = AddReadTHeaderToContext(ctx, headerProtocol.GetReadHeaders())
Yuxuan 'fishy' Wang26ef9042019-08-19 00:18:22 -0700270 ctx = SetWriteHeaderList(ctx, p.forwardHeaders)
Yuxuan 'fishy' Wangb1002a72019-08-05 13:03:02 -0700271 }
272
273 ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
Jens Geyerf4598682014-05-08 23:18:44 +0200274 if err, ok := err.(TTransportException); ok && err.TypeId() == END_OF_FILE {
Jens Geyer0e87c462013-06-18 22:25:07 +0200275 return nil
276 } else if err != nil {
277 return err
278 }
zhangxin54f49f82016-09-19 12:17:20 +0800279 if err, ok := err.(TApplicationException); ok && err.TypeId() == UNKNOWN_METHOD {
280 continue
281 }
libinbina5768962017-05-18 14:18:28 +0800282 if !ok {
Jens Geyer0e87c462013-06-18 22:25:07 +0200283 break
284 }
285 }
286 return nil
287}